From 690f29605a6af535cb7df6ce3820a2c8bc04427a Mon Sep 17 00:00:00 2001 From: xieyichen Date: Fri, 21 Jul 2023 10:05:46 +0800 Subject: [PATCH 1/4] handle remove error --- src/Backups/BackupInDirectory.cpp | 2 +- src/Core/Settings.h | 1 + src/Disks/DiskLocal.cpp | 91 +++++++++++++++++++++--- src/Disks/DiskLocal.h | 6 +- src/Disks/DiskSelector.cpp | 7 +- src/Disks/RemoteDisksCommon.cpp | 7 +- src/Disks/tests/gtest_disk.cpp | 9 ++- src/Disks/tests/gtest_disk_encrypted.cpp | 2 +- src/Interpreters/Context.cpp | 2 +- 9 files changed, 103 insertions(+), 24 deletions(-) diff --git a/src/Backups/BackupInDirectory.cpp b/src/Backups/BackupInDirectory.cpp index f071a21618cd..04744abb64a4 100644 --- a/src/Backups/BackupInDirectory.cpp +++ b/src/Backups/BackupInDirectory.cpp @@ -67,7 +67,7 @@ BackupInDirectory::BackupInDirectory( path = fspath.filename() / ""; dir_path = fs::path(path).parent_path(); /// get path without terminating slash String disk_path = fspath.remove_filename(); - disk = std::make_shared(disk_path, disk_path, 0); + disk = std::make_shared(disk_path, disk_path, context_->getSettingsRef().handle_remove_error_path, 0); } open(); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index ca6839494f2e..6c731ba61dad 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -34,6 +34,7 @@ class IColumn; */ #define COMMON_SETTINGS(M) \ + M(String, handle_remove_error_path, "", "When remove_all throws the directory not empty exception, the undeleted data will be moved to the the setting directory. Empty means that the exception will not be processed.", 0) \ M(UInt64, min_compress_block_size, 65536, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.", 0) \ M(UInt64, max_compress_block_size, 1048576, "The maximum size of blocks of uncompressed data before compressing for writing to a table.", 0) \ M(UInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading", 0) \ diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index 15879fa499d1..ab96f349b77e 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -15,11 +15,11 @@ #include #include #include -#include #include #include #include #include +#include namespace CurrentMetrics { @@ -89,7 +89,8 @@ static void loadDiskLocalConfig(const String & name, tmp_path = context->getPath(); // Create tmp disk for getting total disk space. - keep_free_space_bytes = static_cast(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio); + keep_free_space_bytes = static_cast( + DiskLocal("tmp", tmp_path, context->getSettingsRef().handle_remove_error_path, 0).getTotalSpace() * ratio); } } @@ -371,9 +372,61 @@ void DiskLocal::removeDirectory(const String & path) throwFromErrnoWithPath("Cannot rmdir " + fs_path.string(), fs_path, ErrorCodes::CANNOT_RMDIR); } +void DiskLocal::remove_all_for_test(const String & path) +{ + LOG_DEBUG(log, "mock remove_all path: {}", path); + if (path.find("/mnt/cfs") != 0) + { + fs::remove_all(fs::path(path)); + return; + } + for (const auto & entry : std::filesystem::directory_iterator(fs::path(path))) + { + if (entry.is_regular_file() || entry.is_directory()) + { + std::error_code ec = std::make_error_code(std::errc::directory_not_empty); + throw std::filesystem::filesystem_error("Directory not empty", entry.path(), ec); + } + } + fs::remove_all(path); +} + +void DiskLocal::moveFiles(const fs::path & source_dir, const fs::path & target_dir) +{ + fs::path relative_target_path = target_dir / fs::relative(source_dir, disk_path); + if (!fs::exists(relative_target_path.parent_path())) + { + fs::create_directories(relative_target_path.parent_path()); + } + LOG_DEBUG(log, "move files from {} to {}", source_dir.string(), relative_target_path.string()); + fs::rename(source_dir, relative_target_path); +} + void DiskLocal::removeRecursive(const String & path) { - fs::remove_all(fs::path(disk_path) / path); + try + { + fs::remove_all((fs::path(disk_path) / path).string()); + } + catch (const fs::filesystem_error & e) + { + if (e.code() == std::errc::directory_not_empty && handle_remove_error_path != "" && disk_path.find("/mnt/cfs") == 0) + { + LOG_DEBUG( + log, + "remove_all get directory not empty error, try to move, error: {}, disk path: {}, remove path: {}", + e.what(), + disk_path, + (fs::path(disk_path) / path).string()); + moveFiles(e.path1(), fs::path(handle_remove_error_path)); + if (fs::exists(fs::path(disk_path) / path)) + { + removeRecursive(path); + } + } + else + throw; + } } void DiskLocal::listFiles(const String & path, std::vector & file_names) @@ -417,9 +470,8 @@ void DiskLocal::createFile(const String & path) void DiskLocal::setReadOnly(const String & path) { - fs::permissions(fs::path(disk_path) / path, - fs::perms::owner_write | fs::perms::group_write | fs::perms::others_write, - fs::perm_options::remove); + fs::permissions( + fs::path(disk_path) / path, fs::perms::owner_write | fs::perms::group_write | fs::perms::others_write, fs::perm_options::remove); } bool inline isSameDiskType(const IDisk & one, const IDisk & another) @@ -464,17 +516,23 @@ void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & confi keep_free_space_bytes = new_keep_free_space_bytes; } -DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_) +DiskLocal::DiskLocal(const String & name_, const String & path_, const String & handle_remove_error_path_, UInt64 keep_free_space_bytes_) : name(name_) , disk_path(path_) + , handle_remove_error_path(handle_remove_error_path_) , keep_free_space_bytes(keep_free_space_bytes_) , logger(&Poco::Logger::get("DiskLocal")) { } DiskLocal::DiskLocal( - const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context, UInt64 local_disk_check_period_ms) - : DiskLocal(name_, path_, keep_free_space_bytes_) + const String & name_, + const String & path_, + const String & handle_remove_error_path_, + UInt64 keep_free_space_bytes_, + ContextPtr context, + UInt64 local_disk_check_period_ms) + : DiskLocal(name_, path_, handle_remove_error_path_, keep_free_space_bytes_) { if (local_disk_check_period_ms > 0) disk_checker = std::make_unique(this, context, local_disk_check_period_ms); @@ -666,8 +724,19 @@ void registerDiskLocal(DiskFactory & factory) if (path == disk_ptr->getPath()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk {} and disk {} cannot have the same path ({})", name, disk_name, path); - std::shared_ptr disk - = std::make_shared(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0)); + const Settings & current_settings = context->getSettingsRef(); + std::shared_ptr disk = std::make_shared( + name, + path, + current_settings.handle_remove_error_path, + keep_free_space_bytes, + context, + config.getUInt("local_disk_check_period_ms", 0)); + LOG_DEBUG( + &Poco::Logger::get("DiskLocal"), + "register local disk, name: {}, handle remove error path: {}", + name, + String(current_settings.handle_remove_error_path)); disk->startup(); return std::make_shared(disk); }; diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h index 3ae52ee8b675..6105eb36b068 100644 --- a/src/Disks/DiskLocal.h +++ b/src/Disks/DiskLocal.h @@ -20,10 +20,11 @@ class DiskLocal : public IDisk friend class DiskLocalCheckThread; friend class DiskLocalReservation; - DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_); + DiskLocal(const String & name_, const String & path_, const String & handle_remove_error_path, UInt64 keep_free_space_bytes_); DiskLocal( const String & name_, const String & path_, + const String & handle_remove_error_path, UInt64 keep_free_space_bytes_, ContextPtr context, UInt64 local_disk_check_period_ms); @@ -128,10 +129,13 @@ class DiskLocal : public IDisk /// Read magic number from disk checker file. Return std::nullopt if exception happens. std::optional readDiskCheckerMagicNumber() const noexcept; + void remove_all_for_test(const String & path); + void moveFiles(const fs::path & source_dir, const fs::path & target_dir); const String name; const String disk_path; const String disk_checker_path = ".disk_checker_file"; + const String handle_remove_error_path; std::atomic keep_free_space_bytes; Poco::Logger * logger; diff --git a/src/Disks/DiskSelector.cpp b/src/Disks/DiskSelector.cpp index 4c80b128b4bb..ece9f8e633b1 100644 --- a/src/Disks/DiskSelector.cpp +++ b/src/Disks/DiskSelector.cpp @@ -44,7 +44,12 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con disks.emplace( default_disk_name, std::make_shared( - default_disk_name, context->getPath(), 0, context, config.getUInt("local_disk_check_period_ms", 0))); + default_disk_name, + context->getPath(), + context->getSettingsRef().handle_remove_error_path, + 0, + context, + config.getUInt("local_disk_check_period_ms", 0))); } } diff --git a/src/Disks/RemoteDisksCommon.cpp b/src/Disks/RemoteDisksCommon.cpp index 36f2aed3e7ce..9f3cbe1ebead 100644 --- a/src/Disks/RemoteDisksCommon.cpp +++ b/src/Disks/RemoteDisksCommon.cpp @@ -15,8 +15,8 @@ std::shared_ptr wrapWithCache( { if (metadata_path == cache_path) throw Exception("Metadata and cache paths should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS); - - auto cache_disk = std::make_shared(cache_name, cache_path, 0); + //cache_disk must be local disk + auto cache_disk = std::make_shared(cache_name, cache_path, "", 0); auto cache_file_predicate = [] (const String & path) { return path.ends_with("idx") // index files. @@ -45,7 +45,8 @@ std::pair prepareForLocalMetadata( /// where the metadata files are stored locally auto metadata_path = getDiskMetadataPath(name, config, config_prefix, context); fs::create_directories(metadata_path); - auto metadata_disk = std::make_shared(name + "-metadata", metadata_path, 0); + auto metadata_disk + = std::make_shared(name + "-metadata", metadata_path, context->getSettingsRef().handle_remove_error_path, 0); return std::make_pair(metadata_path, metadata_disk); } diff --git a/src/Disks/tests/gtest_disk.cpp b/src/Disks/tests/gtest_disk.cpp index 36f912493919..b29701c81a59 100644 --- a/src/Disks/tests/gtest_disk.cpp +++ b/src/Disks/tests/gtest_disk.cpp @@ -1,9 +1,8 @@ -#include -#include -#include #include "gtest_disk.h" #include - +#include +#include +#include namespace fs = std::filesystem; @@ -26,7 +25,7 @@ template <> DB::DiskPtr createDisk() { fs::create_directory("tmp/"); - return std::make_shared("local_disk", "tmp/", 0); + return std::make_shared("local_disk", "tmp/", "", 0); } diff --git a/src/Disks/tests/gtest_disk_encrypted.cpp b/src/Disks/tests/gtest_disk_encrypted.cpp index fd3cc1acbe59..96f6ab86f6b9 100644 --- a/src/Disks/tests/gtest_disk_encrypted.cpp +++ b/src/Disks/tests/gtest_disk_encrypted.cpp @@ -23,7 +23,7 @@ class DiskEncryptedTest : public ::testing::Test /// Make local disk. temp_dir = std::make_unique(); temp_dir->createDirectories(); - local_disk = std::make_shared("local_disk", getDirectory(), 0); + local_disk = std::make_shared("local_disk", getDirectory(), "", 0); } void TearDown() override diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 353b8a854c89..b54a6cee3298 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -612,7 +612,7 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic if (!shared->tmp_path.ends_with('/')) shared->tmp_path += '/'; - auto disk = std::make_shared("_tmp_default", shared->tmp_path, 0); + auto disk = std::make_shared("_tmp_default", shared->tmp_path, getSettingsRef().handle_remove_error_path, 0); shared->tmp_volume = std::make_shared("_tmp_default", disk, 0); } else From fb86e55bf260e1b9572e3b2ac90998676f90fef2 Mon Sep 17 00:00:00 2001 From: xieyichen Date: Tue, 25 Jul 2023 10:43:27 +0800 Subject: [PATCH 2/4] modify diskLocal constructor --- src/Backups/BackupInDirectory.cpp | 2 +- src/Disks/DiskLocal.cpp | 50 +++++++++--------------- src/Disks/DiskLocal.h | 13 ++---- src/Disks/DiskSelector.cpp | 7 +--- src/Disks/RemoteDisksCommon.cpp | 7 ++-- src/Disks/tests/gtest_disk.cpp | 9 +++-- src/Disks/tests/gtest_disk_encrypted.cpp | 2 +- src/Interpreters/Context.cpp | 2 +- 8 files changed, 34 insertions(+), 58 deletions(-) diff --git a/src/Backups/BackupInDirectory.cpp b/src/Backups/BackupInDirectory.cpp index 04744abb64a4..f071a21618cd 100644 --- a/src/Backups/BackupInDirectory.cpp +++ b/src/Backups/BackupInDirectory.cpp @@ -67,7 +67,7 @@ BackupInDirectory::BackupInDirectory( path = fspath.filename() / ""; dir_path = fs::path(path).parent_path(); /// get path without terminating slash String disk_path = fspath.remove_filename(); - disk = std::make_shared(disk_path, disk_path, context_->getSettingsRef().handle_remove_error_path, 0); + disk = std::make_shared(disk_path, disk_path, 0); } open(); diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index ab96f349b77e..3278c9d9df44 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -89,8 +89,7 @@ static void loadDiskLocalConfig(const String & name, tmp_path = context->getPath(); // Create tmp disk for getting total disk space. - keep_free_space_bytes = static_cast( - DiskLocal("tmp", tmp_path, context->getSettingsRef().handle_remove_error_path, 0).getTotalSpace() * ratio); + keep_free_space_bytes = static_cast(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio); } } @@ -391,22 +390,21 @@ void DiskLocal::remove_all_for_test(const String & path) fs::remove_all(path); } -void DiskLocal::moveFiles(const fs::path & source_dir, const fs::path & target_dir) +void DiskLocal::renameFiles(const fs::path & source_dir, const fs::path & target_dir) { - fs::path relative_target_path = target_dir / fs::relative(source_dir, disk_path); - if (!fs::exists(relative_target_path.parent_path())) + if (!fs::exists(target_dir.parent_path())) { - fs::create_directories(relative_target_path.parent_path()); + fs::create_directories(target_dir.parent_path()); } - LOG_DEBUG(log, "move files from {} to {}", source_dir.string(), relative_target_path.string()); - fs::rename(source_dir, relative_target_path); + LOG_DEBUG(log, "move files from {} to {}", source_dir.string(), target_dir.string()); + fs::rename(source_dir, target_dir); } void DiskLocal::removeRecursive(const String & path) { try { - fs::remove_all((fs::path(disk_path) / path).string()); + fs::remove_all(fs::path(disk_path) / path); } catch (const fs::filesystem_error & e) { @@ -418,7 +416,8 @@ void DiskLocal::removeRecursive(const String & path) e.what(), disk_path, (fs::path(disk_path) / path).string()); - moveFiles(e.path1(), fs::path(handle_remove_error_path)); + fs::path to_path = fs::path(handle_remove_error_path) / fs::relative(e.path1(), disk_path); + renameFiles(e.path1(), to_path); if (fs::exists(fs::path(disk_path) / path)) { removeRecursive(path); @@ -516,26 +515,19 @@ void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & confi keep_free_space_bytes = new_keep_free_space_bytes; } -DiskLocal::DiskLocal(const String & name_, const String & path_, const String & handle_remove_error_path_, UInt64 keep_free_space_bytes_) - : name(name_) - , disk_path(path_) - , handle_remove_error_path(handle_remove_error_path_) - , keep_free_space_bytes(keep_free_space_bytes_) - , logger(&Poco::Logger::get("DiskLocal")) +DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_) + : name(name_), disk_path(path_), keep_free_space_bytes(keep_free_space_bytes_), logger(&Poco::Logger::get("DiskLocal")) { } DiskLocal::DiskLocal( - const String & name_, - const String & path_, - const String & handle_remove_error_path_, - UInt64 keep_free_space_bytes_, - ContextPtr context, - UInt64 local_disk_check_period_ms) - : DiskLocal(name_, path_, handle_remove_error_path_, keep_free_space_bytes_) + const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context, UInt64 local_disk_check_period_ms) + : DiskLocal(name_, path_, keep_free_space_bytes_) { if (local_disk_check_period_ms > 0) disk_checker = std::make_unique(this, context, local_disk_check_period_ms); + const auto & settings = context->getSettingsRef(); + handle_remove_error_path = String(settings.handle_remove_error_path); } void DiskLocal::startup() @@ -724,19 +716,13 @@ void registerDiskLocal(DiskFactory & factory) if (path == disk_ptr->getPath()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk {} and disk {} cannot have the same path ({})", name, disk_name, path); - const Settings & current_settings = context->getSettingsRef(); - std::shared_ptr disk = std::make_shared( - name, - path, - current_settings.handle_remove_error_path, - keep_free_space_bytes, - context, - config.getUInt("local_disk_check_period_ms", 0)); + std::shared_ptr disk + = std::make_shared(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0)); LOG_DEBUG( &Poco::Logger::get("DiskLocal"), "register local disk, name: {}, handle remove error path: {}", name, - String(current_settings.handle_remove_error_path)); + String(context->getSettingsRef().handle_remove_error_path)); disk->startup(); return std::make_shared(disk); }; diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h index 6105eb36b068..a297fbfb5184 100644 --- a/src/Disks/DiskLocal.h +++ b/src/Disks/DiskLocal.h @@ -20,14 +20,9 @@ class DiskLocal : public IDisk friend class DiskLocalCheckThread; friend class DiskLocalReservation; - DiskLocal(const String & name_, const String & path_, const String & handle_remove_error_path, UInt64 keep_free_space_bytes_); + DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_); DiskLocal( - const String & name_, - const String & path_, - const String & handle_remove_error_path, - UInt64 keep_free_space_bytes_, - ContextPtr context, - UInt64 local_disk_check_period_ms); + const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context, UInt64 local_disk_check_period_ms); const String & getName() const override { return name; } @@ -130,12 +125,12 @@ class DiskLocal : public IDisk /// Read magic number from disk checker file. Return std::nullopt if exception happens. std::optional readDiskCheckerMagicNumber() const noexcept; void remove_all_for_test(const String & path); - void moveFiles(const fs::path & source_dir, const fs::path & target_dir); + void renameFiles(const fs::path & source_dir, const fs::path & target_dir); const String name; const String disk_path; const String disk_checker_path = ".disk_checker_file"; - const String handle_remove_error_path; + String handle_remove_error_path; std::atomic keep_free_space_bytes; Poco::Logger * logger; diff --git a/src/Disks/DiskSelector.cpp b/src/Disks/DiskSelector.cpp index ece9f8e633b1..4c80b128b4bb 100644 --- a/src/Disks/DiskSelector.cpp +++ b/src/Disks/DiskSelector.cpp @@ -44,12 +44,7 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con disks.emplace( default_disk_name, std::make_shared( - default_disk_name, - context->getPath(), - context->getSettingsRef().handle_remove_error_path, - 0, - context, - config.getUInt("local_disk_check_period_ms", 0))); + default_disk_name, context->getPath(), 0, context, config.getUInt("local_disk_check_period_ms", 0))); } } diff --git a/src/Disks/RemoteDisksCommon.cpp b/src/Disks/RemoteDisksCommon.cpp index 9f3cbe1ebead..36f2aed3e7ce 100644 --- a/src/Disks/RemoteDisksCommon.cpp +++ b/src/Disks/RemoteDisksCommon.cpp @@ -15,8 +15,8 @@ std::shared_ptr wrapWithCache( { if (metadata_path == cache_path) throw Exception("Metadata and cache paths should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS); - //cache_disk must be local disk - auto cache_disk = std::make_shared(cache_name, cache_path, "", 0); + + auto cache_disk = std::make_shared(cache_name, cache_path, 0); auto cache_file_predicate = [] (const String & path) { return path.ends_with("idx") // index files. @@ -45,8 +45,7 @@ std::pair prepareForLocalMetadata( /// where the metadata files are stored locally auto metadata_path = getDiskMetadataPath(name, config, config_prefix, context); fs::create_directories(metadata_path); - auto metadata_disk - = std::make_shared(name + "-metadata", metadata_path, context->getSettingsRef().handle_remove_error_path, 0); + auto metadata_disk = std::make_shared(name + "-metadata", metadata_path, 0); return std::make_pair(metadata_path, metadata_disk); } diff --git a/src/Disks/tests/gtest_disk.cpp b/src/Disks/tests/gtest_disk.cpp index b29701c81a59..36f912493919 100644 --- a/src/Disks/tests/gtest_disk.cpp +++ b/src/Disks/tests/gtest_disk.cpp @@ -1,8 +1,9 @@ -#include "gtest_disk.h" -#include +#include #include #include -#include +#include "gtest_disk.h" +#include + namespace fs = std::filesystem; @@ -25,7 +26,7 @@ template <> DB::DiskPtr createDisk() { fs::create_directory("tmp/"); - return std::make_shared("local_disk", "tmp/", "", 0); + return std::make_shared("local_disk", "tmp/", 0); } diff --git a/src/Disks/tests/gtest_disk_encrypted.cpp b/src/Disks/tests/gtest_disk_encrypted.cpp index 96f6ab86f6b9..fd3cc1acbe59 100644 --- a/src/Disks/tests/gtest_disk_encrypted.cpp +++ b/src/Disks/tests/gtest_disk_encrypted.cpp @@ -23,7 +23,7 @@ class DiskEncryptedTest : public ::testing::Test /// Make local disk. temp_dir = std::make_unique(); temp_dir->createDirectories(); - local_disk = std::make_shared("local_disk", getDirectory(), "", 0); + local_disk = std::make_shared("local_disk", getDirectory(), 0); } void TearDown() override diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index b54a6cee3298..353b8a854c89 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -612,7 +612,7 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic if (!shared->tmp_path.ends_with('/')) shared->tmp_path += '/'; - auto disk = std::make_shared("_tmp_default", shared->tmp_path, getSettingsRef().handle_remove_error_path, 0); + auto disk = std::make_shared("_tmp_default", shared->tmp_path, 0); shared->tmp_volume = std::make_shared("_tmp_default", disk, 0); } else From 2d95546db0a9e4dbce13f38ee812eb84bb17d0d7 Mon Sep 17 00:00:00 2001 From: xieyichen Date: Thu, 2 Nov 2023 16:49:33 +0800 Subject: [PATCH 3/4] support cubefs sdk --- .gitmodules | 4 + contrib/CMakeLists.txt | 1 + contrib/cubefs | 1 + contrib/cubefs-cmake/CMakeLists.txt | 30 + programs/CMakeLists.txt | 25 + programs/config_tools.h.in | 1 + programs/cubefs-test/CMakeLists.txt | 4 + .../cubefs-test/clickhouse-cubefs-test.cpp | 5 + programs/cubefs-test/cubefs-dlopen.cpp | 165 +++++ programs/cubefs-test/cubefs-test.cpp | 103 +++ programs/main.cpp | 6 + src/CMakeLists.txt | 13 +- src/Disks/CubeFS/DiskCubeFS.cpp | 675 ++++++++++++++++++ src/Disks/CubeFS/DiskCubeFS.h | 110 +++ src/Disks/CubeFS/registerDiskCubeFS.cpp | 49 ++ src/Disks/CubeFS/sdkLoader.cpp | 48 ++ src/Disks/CubeFS/sdkLoader.h | 63 ++ src/Disks/DiskType.h | 3 + src/Disks/registerDisks.cpp | 3 +- src/Disks/tests/gtest_disk_cubefs.cpp | 256 +++++++ src/IO/ReadBufferFromCubeFS.cpp | 195 +++++ src/IO/ReadBufferFromCubeFS.h | 52 ++ src/IO/WriteBufferFromCubeFS.cpp | 122 ++++ src/IO/WriteBufferFromCubeFS.h | 38 + 24 files changed, 1970 insertions(+), 2 deletions(-) create mode 160000 contrib/cubefs create mode 100644 contrib/cubefs-cmake/CMakeLists.txt create mode 100644 programs/cubefs-test/CMakeLists.txt create mode 100644 programs/cubefs-test/clickhouse-cubefs-test.cpp create mode 100644 programs/cubefs-test/cubefs-dlopen.cpp create mode 100644 programs/cubefs-test/cubefs-test.cpp create mode 100644 src/Disks/CubeFS/DiskCubeFS.cpp create mode 100644 src/Disks/CubeFS/DiskCubeFS.h create mode 100644 src/Disks/CubeFS/registerDiskCubeFS.cpp create mode 100644 src/Disks/CubeFS/sdkLoader.cpp create mode 100644 src/Disks/CubeFS/sdkLoader.h create mode 100644 src/Disks/tests/gtest_disk_cubefs.cpp create mode 100644 src/IO/ReadBufferFromCubeFS.cpp create mode 100644 src/IO/ReadBufferFromCubeFS.h create mode 100644 src/IO/WriteBufferFromCubeFS.cpp create mode 100644 src/IO/WriteBufferFromCubeFS.h diff --git a/.gitmodules b/.gitmodules index 6c9e66f9cbc7..4ad531684245 100644 --- a/.gitmodules +++ b/.gitmodules @@ -262,3 +262,7 @@ [submodule "contrib/minizip-ng"] path = contrib/minizip-ng url = https://github.com/zlib-ng/minizip-ng +[submodule "contrib/cubefs"] + path = contrib/cubefs + url = https://github.com/RunningXie/cubefs.git + branch = fix_sdk_compilation diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 9cf307c473ee..e1e18fcd6e1b 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -135,6 +135,7 @@ add_contrib (libpq-cmake libpq) add_contrib (nuraft-cmake NuRaft) add_contrib (fast_float-cmake fast_float) add_contrib (datasketches-cpp-cmake datasketches-cpp) +add_contrib (cubefs-cmake cubefs) option(ENABLE_NLP "Enable NLP functions support" ${ENABLE_LIBRARIES}) if (ENABLE_NLP) diff --git a/contrib/cubefs b/contrib/cubefs new file mode 160000 index 000000000000..5eb497912217 --- /dev/null +++ b/contrib/cubefs @@ -0,0 +1 @@ +Subproject commit 5eb4979122170f02e2c2fc4e14f46f6092c4e08e diff --git a/contrib/cubefs-cmake/CMakeLists.txt b/contrib/cubefs-cmake/CMakeLists.txt new file mode 100644 index 000000000000..abec8b08c765 --- /dev/null +++ b/contrib/cubefs-cmake/CMakeLists.txt @@ -0,0 +1,30 @@ +cmake_minimum_required(VERSION 3.20) +message(STATUS "start handle contrib/cubefs-cmake/CMakeLists.txt") +set(CUBEFS_DIR "${ClickHouse_SOURCE_DIR}/contrib/cubefs") +message(STATUS "start execute_process") +execute_process( + COMMAND make libsdk + WORKING_DIRECTORY ${CUBEFS_DIR}) +add_library(_cubefs STATIC IMPORTED GLOBAL) +set_target_properties(_cubefs PROPERTIES + IMPORTED_LOCATION "${CUBEFS_DIR}/build/bin/libcfs.a" +) +add_library(ch_contrib::cubefs ALIAS _cubefs) +set(FILE_PATH "${CUBEFS_DIR}/libsdk/libcfs.h") +# Add configuration to ignore warnings in the header file +file(STRINGS ${FILE_PATH} FILE_CONTENTS) +list(FIND FILE_CONTENTS "#pragma GCC diagnostic push" PRAGMA_INDEX) +if (PRAGMA_INDEX EQUAL -1) + execute_process( + COMMAND sh -c "echo '#pragma GCC diagnostic push\n#pragma GCC diagnostic ignored \"-Wreserved-identifier\"\n#pragma GCC diagnostic ignored \"-Wmacro-redefined\"\n' | cat - ${FILE_PATH}" + OUTPUT_FILE "${FILE_PATH}.tmp" + ) + execute_process( + COMMAND sh -c "echo '#pragma GCC diagnostic pop' >> ${FILE_PATH}.tmp" + ) + execute_process( + COMMAND mv "${FILE_PATH}.tmp" "${FILE_PATH}" + ) +endif() + + diff --git a/programs/CMakeLists.txt b/programs/CMakeLists.txt index cca7be97b61b..629e0c0fc8a0 100644 --- a/programs/CMakeLists.txt +++ b/programs/CMakeLists.txt @@ -24,6 +24,9 @@ option (ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG "Configs processor (extract values # https://clickhouse.com/docs/en/operations/utilities/clickhouse-compressor/ option (ENABLE_CLICKHOUSE_COMPRESSOR "Data compressor and decompressor" ${ENABLE_CLICKHOUSE_ALL}) +# cubefs test +option (ENABLE_CLICKHOUSE_CUBEFS_TEST "Cubefs test" ${ENABLE_CLICKHOUSE_ALL}) + # https://clickhouse.com/docs/en/operations/utilities/clickhouse-copier/ option (ENABLE_CLICKHOUSE_COPIER "Inter-cluster data copying mode" ${ENABLE_CLICKHOUSE_ALL}) @@ -114,6 +117,12 @@ else() message(STATUS "Copier mode: OFF") endif() +if (ENABLE_CLICKHOUSE_CUBEFS_TEST) + message(STATUS "Cubefs test: ON") +else() + message(STATUS "Cubefs test: OFF") +endif() + if (ENABLE_CLICKHOUSE_FORMAT) message(STATUS "Format mode: ON") else() @@ -230,6 +239,7 @@ add_subdirectory (install) add_subdirectory (git-import) add_subdirectory (bash-completion) add_subdirectory (static-files-disk-uploader) +add_subdirectory (cubefs-test) if (ENABLE_CLICKHOUSE_KEEPER) add_subdirectory (keeper) @@ -261,6 +271,7 @@ if (CLICKHOUSE_ONE_SHARED) ${CLICKHOUSE_GIT_IMPORT_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES} ${CLICKHOUSE_KEEPER_SOURCES} + ${CLICKHOUSE_CUBEFS_TEST_SOURCES} ${CLICKHOUSE_KEEPER_CONVERTER_SOURCES} ${CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_SOURCES}) @@ -277,6 +288,7 @@ if (CLICKHOUSE_ONE_SHARED) ${CLICKHOUSE_GIT_IMPORT_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK} ${CLICKHOUSE_KEEPER_LINK} + ${CLICKHOUSE_CUBEFS_TEST_LINK} ${CLICKHOUSE_KEEPER_CONVERTER_LINK} ${CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_LINK}) @@ -293,6 +305,7 @@ if (CLICKHOUSE_ONE_SHARED) ${CLICKHOUSE_GIT_IMPORT_INCLUDE} ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE} ${CLICKHOUSE_KEEPER_INCLUDE} + ${CLICKHOUSE_CUBEFS_TEST_INCLUDE} ${CLICKHOUSE_KEEPER_CONVERTER_INCLUDE}) set_target_properties(clickhouse-lib PROPERTIES SOVERSION ${VERSION_MAJOR}.${VERSION_MINOR} VERSION ${VERSION_SO} OUTPUT_NAME clickhouse DEBUG_POSTFIX "") @@ -330,6 +343,10 @@ if (CLICKHOUSE_SPLIT_BINARY) list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-keeper-converter) endif () + if (ENABLE_CLICKHOUSE_CUBEFS_TEST) + list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-cubefs-test) + endif () + set_target_properties(${CLICKHOUSE_ALL_TARGETS} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..) add_custom_target (clickhouse-bundle ALL DEPENDS ${CLICKHOUSE_ALL_TARGETS}) @@ -356,6 +373,9 @@ else () if (ENABLE_CLICKHOUSE_LOCAL) clickhouse_target_link_split_lib(clickhouse local) endif () + if (ENABLE_CLICKHOUSE_CUBEFS_TEST) + clickhouse_target_link_split_lib(clickhouse cubefs-test) + endif () if (ENABLE_CLICKHOUSE_BENCHMARK) clickhouse_target_link_split_lib(clickhouse benchmark) endif () @@ -416,6 +436,11 @@ else () install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-copier" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) list(APPEND CLICKHOUSE_BUNDLE clickhouse-copier) endif () + if (ENABLE_CLICKHOUSE_CUBEFS_TEST) + add_custom_target (clickhouse-cubefs-test ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-cubefs-test DEPENDS clickhouse) + install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-cubefs-test" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) + list(APPEND CLICKHOUSE_BUNDLE clickhouse-cubefs-test) + endif () if (ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG) add_custom_target (clickhouse-extract-from-config ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-extract-from-config DEPENDS clickhouse) install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-extract-from-config" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) diff --git a/programs/config_tools.h.in b/programs/config_tools.h.in index b97eb63b5352..30f890045696 100644 --- a/programs/config_tools.h.in +++ b/programs/config_tools.h.in @@ -19,3 +19,4 @@ #cmakedefine01 ENABLE_CLICKHOUSE_KEEPER #cmakedefine01 ENABLE_CLICKHOUSE_KEEPER_CONVERTER #cmakedefine01 ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER +#cmakedefine01 ENABLE_CLICKHOUSE_CUBEFS_TEST diff --git a/programs/cubefs-test/CMakeLists.txt b/programs/cubefs-test/CMakeLists.txt new file mode 100644 index 000000000000..54f1628419a6 --- /dev/null +++ b/programs/cubefs-test/CMakeLists.txt @@ -0,0 +1,4 @@ +set (CLICKHOUSE_CUBEFS_TEST_SOURCES cubefs-test.cpp clickhouse-cubefs-test.cpp) +set (CLICKHOUSE_CUBEFS_TEST_LINK dbms ch_contrib::cubefs) +set (CLICKHOUSE_CUBEFS_TEST_INCLUDE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) +clickhouse_program_add(cubefs-test) diff --git a/programs/cubefs-test/clickhouse-cubefs-test.cpp b/programs/cubefs-test/clickhouse-cubefs-test.cpp new file mode 100644 index 000000000000..384aea9916dc --- /dev/null +++ b/programs/cubefs-test/clickhouse-cubefs-test.cpp @@ -0,0 +1,5 @@ +int mainEntryClickHouseCubefsTest(int argc, char ** argv); +int main(int argc_, char ** argv_) +{ + return mainEntryClickHouseCubefsTest(argc_, argv_); +} diff --git a/programs/cubefs-test/cubefs-dlopen.cpp b/programs/cubefs-test/cubefs-dlopen.cpp new file mode 100644 index 000000000000..996106eff82b --- /dev/null +++ b/programs/cubefs-test/cubefs-dlopen.cpp @@ -0,0 +1,165 @@ +#include +#include +#include +#include + + +int main() +{ + void * handle = dlopen("./libcfs.so", RTLD_LAZY); + if (!handle) + { + std::cerr << "Failed to open the dynamic library: " << dlerror() << std::endl; + return 1; + } + typedef int64_t (*CfsNewClientFunction)(); + typedef int (*CfsSetClientFunction)(int64_t, char *, char *); + typedef int (*CfsStartClientFunction)(int64_t); + typedef void (*CfsCloseClientFunction)(int64_t); + typedef int (*CfsChdirFunction)(int64_t, char *); + typedef char * (*CfsGetcwdFunction)(int64_t); + typedef int (*CfsGetattrFunction)(int64_t, char *, struct cfs_stat_info *); + typedef int (*CfsSetattrFunction)(int64_t, char *, struct cfs_stat_info *, int); + typedef int (*CfsOpenFunction)(int64_t, char *, int, mode_t); + typedef int (*CfsFlushFunction)(int64_t, int); + typedef void (*CfsCloseFunction)(int64_t, int); + typedef ssize_t (*CfsWriteFunction)(int64_t, int, void *, size_t, off_t); + typedef ssize_t (*CfsReadFunction)(int64_t, int, void *, size_t, off_t); + typedef int (*CfsBatchGetInodesFunction)(int64_t, int, void *, GoSlice, int); + typedef int (*CfsRefreshSummaryFunction)(int64_t, char *, int); + typedef int (*CfsReaddirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsLsdirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsMkdirsFunction)(int64_t, char *, mode_t); + typedef int (*CfsRmdirFunction)(int64_t, char *); + typedef int (*CfsUnlinkFunction)(int64_t, char *); + typedef int (*CfsRenameFunction)(int64_t, char *, char *); + typedef int (*CfsFchmodFunction)(int64_t, int, mode_t); + typedef int (*CfsGetsummaryFunction)(int64_t, char *, struct cfs_summary_info *, char *, int); + CfsNewClientFunction cfsNewClient = (CfsNewClientFunction)dlsym(handle, "cfs_new_client"); + CfsSetClientFunction cfsSetClient = (CfsSetClientFunction)dlsym(handle, "cfs_set_client"); + CfsStartClientFunction cfsStartClient = (CfsStartClientFunction)dlsym(handle, "cfs_start_client"); + CfsCloseClientFunction cfsCloseClient = (CfsCloseClientFunction)dlsym(handle, "cfs_close_client"); + CfsChdirFunction cfsChdir = (CfsChdirFunction)dlsym(handle, "cfs_chdir"); + CfsGetcwdFunction cfsGetcwd = (CfsGetcwdFunction)dlsym(handle, "cfs_getcwd"); + CfsGetattrFunction cfsGetattr = (CfsGetattrFunction)dlsym(handle, "cfs_getattr"); + CfsSetattrFunction cfsSetattr = (CfsSetattrFunction)dlsym(handle, "cfs_setattr"); + CfsOpenFunction cfsOpen = (CfsOpenFunction)dlsym(handle, "cfs_open"); + CfsFlushFunction cfsFlush = (CfsFlushFunction)dlsym(handle, "cfs_flush"); + CfsCloseFunction cfsClose = (CfsCloseFunction)dlsym(handle, "cfs_close"); + CfsLsdirFunction cfsLsdir = (CfsLsdirFunction)dlsym(handle, "cfs_lsdir"); + CfsMkdirsFunction cfsMkdirs = (CfsMkdirsFunction)dlsym(handle, "cfs_mkdirs"); + CfsRmdirFunction cfsRmdir = (CfsRmdirFunction)dlsym(handle, "cfs_rmdir"); + CfsUnlinkFunction cfsUnlink = (CfsUnlinkFunction)dlsym(handle, "cfs_unlink"); + CfsRenameFunction cfsRename = (CfsRenameFunction)dlsym(handle, "cfs_rename"); + CfsFchmodFunction cfsFchmod = (CfsFchmodFunction)dlsym(handle, "cfs_fchmod"); + CfsGetsummaryFunction cfsGetsummary = (CfsGetsummaryFunction)dlsym(handle, "cfs_getsummary"); + CfsWriteFunction cfsWrite = (CfsWriteFunction)dlsym(handle, "cfs_write"); + CfsReadFunction cfsRead = (CfsReadFunction)dlsym(handle, "cfs_read"); + if (!cfsNewClient || !cfsSetClient || !cfsStartClient || !cfsCloseClient || !cfsChdir || !cfsGetcwd || !cfsGetattr || !cfsSetattr + || !cfsOpen || !cfsFlush || !cfsClose || !cfsWrite || !cfsRead || !cfsLsdir || !cfsMkdirs || !cfsRmdir || !cfsUnlink || !cfsRename + || !cfsFchmod || !cfsGetsummary) + { + std::cerr << "Failed to find one or more functions: " << dlerror() << std::endl; + dlclose(handle); + return 1; + } + int64_t clientId = cfsNewClient(); + if (clientId == 0) + { + std::cerr << "Failed to create client" << std::endl; + dlclose(handle); + return 1; + } + int result = cfsSetClient(clientId, strdup("volName"), strdup("xieyichen")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("masterAddr"), strdup("cfs-south.oppo.local")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("logDir"), strdup("/home/service/var/logs/cfs/test-log")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("logLevel"), strdup("debug")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("accessKey"), strdup("jRlZO65q7XlH5bnV")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("secretKey"), strdup("V1m730UzREHaK1jCkC0kL0cewOX0kH3K")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("pushAddr"), strdup("cfs.dg-push.wanyol.com")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsStartClient(clientId); + if (result != 0) + { + std::cerr << "Failed to start client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + int fd = cfsOpen(clientId, strdup("/test_dir/file.txt"), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (fd < 0) + { + printf("Failed to open file\n"); + cfsCloseClient(clientId); + return -1; + } + const char * data = "xieyichen"; + if (cfsWrite(clientId, fd, static_cast(const_cast(data)), strlen(data), 0) < 0) + { + printf("Failed to write file\n"); + cfsClose(clientId, fd); + cfsCloseClient(clientId); + return -1; + } + char buffer[1024]; + memset(buffer, 0, sizeof(buffer)); + if (cfsRead(clientId, fd, static_cast(const_cast(buffer)), sizeof(buffer), 0) < 0) + { + printf("Failed to read file\n"); + cfsClose(clientId, fd); + cfsCloseClient(clientId); + return -1; + } + printf("Read file content: %s\n", buffer); + cfsCloseClient(clientId); + dlclose(handle); + return 0; +} diff --git a/programs/cubefs-test/cubefs-test.cpp b/programs/cubefs-test/cubefs-test.cpp new file mode 100644 index 000000000000..a1c54ed1baa2 --- /dev/null +++ b/programs/cubefs-test/cubefs-test.cpp @@ -0,0 +1,103 @@ +#include +#include +#include +#include +#include +#include + +void readWriteTest() +{ + int64_t id = cfs_new_client(); + if (id <= 0) + { + printf("Failed to create client\n"); + return; + } + if (cfs_set_client(id, strdup("volName"), strdup("xieyichen")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("masterAddr"), strdup("cfs-south.oppo.local")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("logDir"), strdup("/home/service/var/logs/cfs/test-log")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("logLevel"), strdup("debug")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("accessKey"), strdup("jRlZO65q7XlH5bnV")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("secretKey"), strdup("V1m730UzREHaK1jCkC0kL0cewOX0kH3K")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("pushAddr"), strdup("cfs.dg-push.wanyol.com")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_start_client(id) != 0) + { + printf("Failed to start client \n"); + cfs_close_client(id); + return; + } + int fd = cfs_open(id, strdup("/test_dir/file.txt"), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (fd < 0) + { + printf("Failed to open file\n"); + cfs_close_client(id); + return; + } + + const char * data = "Hello, world!"; + if (cfs_write(id, fd, static_cast(const_cast(data)), strlen(data), 0) < 0) + { + printf("Failed to write file\n"); + cfs_close(id, fd); + cfs_close_client(id); + return; + } + + char buffer[1024]; + memset(buffer, 0, sizeof(buffer)); + if (cfs_read(id, fd, static_cast(const_cast(buffer)), sizeof(buffer), 0) < 0) + { + printf("Failed to read file\n"); + cfs_close(id, fd); + cfs_close_client(id); + return; + } + + printf("Read file content: %s\n", buffer); + cfs_close(id, fd); + cfs_close_client(id); +} + +int mainEntryClickHouseCubefsTest(int argc, char ** argv) +{ + std::thread worker(readWriteTest); + worker.join(); + std::cout << "argc = " << argc << std::endl; + std::cout << "argv = " << argv << std::endl; + return 0; +} diff --git a/programs/main.cpp b/programs/main.cpp index 2cdda075ca7d..f8573e13ce78 100644 --- a/programs/main.cpp +++ b/programs/main.cpp @@ -63,6 +63,9 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv); #if ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER int mainEntryClickHouseStaticFilesDiskUploader(int argc, char ** argv); #endif +#if ENABLE_CLICKHOUSE_CUBEFS_TEST +int mainEntryClickHouseCubefsTest(int argc, char ** argv); +#endif #if ENABLE_CLICKHOUSE_INSTALL int mainEntryClickHouseInstall(int argc, char ** argv); int mainEntryClickHouseStart(int argc, char ** argv); @@ -127,6 +130,9 @@ std::pair clickhouse_applications[] = #if ENABLE_CLICKHOUSE_KEEPER_CONVERTER {"keeper-converter", mainEntryClickHouseKeeperConverter}, #endif +#if ENABLE_CLICKHOUSE_CUBEFS_TEST + {"cubefs-test", mainEntryClickHouseCubefsTest}, +#endif #if ENABLE_CLICKHOUSE_INSTALL {"install", mainEntryClickHouseInstall}, {"start", mainEntryClickHouseStart}, diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b24181625d3f..22171706fb8d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -75,12 +75,13 @@ add_subdirectory (Coordination) set(dbms_headers) set(dbms_sources) - +message("ClickHouse_SOURCE_DIR: ${ClickHouse_SOURCE_DIR}") add_headers_and_sources(clickhouse_common_io Common) add_headers_and_sources(clickhouse_common_io Common/HashTable) add_headers_and_sources(clickhouse_common_io IO) add_headers_and_sources(clickhouse_common_io IO/Archives) add_headers_and_sources(clickhouse_common_io IO/S3) +add_headers_and_sources(clickhouse_common_io ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp) add_headers_and_sources(dbms Disks/IO) @@ -111,6 +112,11 @@ if (TARGET ch_contrib::aws_s3) add_headers_and_sources(dbms Disks/S3) endif() +if (TARGET ch_contrib::cubefs) + add_headers_and_sources(dbms Disks/CubeFS) + add_headers_and_sources(dbms ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) +endif() + if (TARGET ch_contrib::azure_sdk) add_headers_and_sources(dbms Disks/AzureBlobStorage) endif() @@ -304,6 +310,7 @@ endmacro () dbms_target_include_directories (PUBLIC "${ClickHouse_SOURCE_DIR}/src" "${ClickHouse_BINARY_DIR}/src") target_include_directories (clickhouse_common_io PUBLIC "${ClickHouse_SOURCE_DIR}/src" "${ClickHouse_BINARY_DIR}/src") +target_include_directories(clickhouse_common_io PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) if (TARGET ch_contrib::llvm) dbms_target_link_libraries (PUBLIC ch_contrib::llvm) @@ -464,6 +471,10 @@ if (TARGET ch_contrib::aws_s3) target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::aws_s3) endif() +if (TARGET ch_contrib::cubefs) + target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::cubefs) +endif() + if (TARGET ch_contrib::azure_sdk) target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::azure_sdk) endif() diff --git a/src/Disks/CubeFS/DiskCubeFS.cpp b/src/Disks/CubeFS/DiskCubeFS.cpp new file mode 100644 index 000000000000..8eaec9f10236 --- /dev/null +++ b/src/Disks/CubeFS/DiskCubeFS.cpp @@ -0,0 +1,675 @@ +#include "DiskCubeFS.h" +#include +#include +#include +#include +#include +namespace CurrentMetrics +{ +extern const Metric DiskSpaceReservedForMerge; +} + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_ELEMENT_IN_CONFIG; + extern const int EXCESSIVE_ELEMENT_IN_CONFIG; + extern const int PATH_ACCESS_DENIED; + extern const int LOGICAL_ERROR; + extern const int CANNOT_TRUNCATE_FILE; + extern const int CANNOT_UNLINK; + extern const int CANNOT_RMDIR; + extern const int BAD_ARGUMENTS; + extern const int CANNOT_CREATE_DIRECTORY; + extern const int CANNOT_STATVFS; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; + extern const int CANNOT_CLOSE_FILE; + extern const int ATOMIC_RENAME_FAIL; + extern const int FILE_ALREADY_EXISTS; + extern const int DIRECTORY_DOESNT_EXIST; + extern const int FILE_DOESNT_EXIST; +} + +const int default_readdir_count = 10000; +std::mutex DiskCubeFS::reservation_mutex; +using DiskCubeFSPtr = std::shared_ptr; + +class DiskCubeFSReservation : public IReservation +{ +public: + DiskCubeFSReservation(const DiskCubeFSPtr & disk_, UInt64 size_) + : disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_) + { + } + + UInt64 getSize() const override { return size; } + + DiskPtr getDisk(size_t i) const override + { + if (i != 0) + throw Exception("Can't use i != 0 with single disk reservation. It's a bug", ErrorCodes::LOGICAL_ERROR); + return disk; + } + + Disks getDisks() const override { return {disk}; } + + void update(UInt64 new_size) override + { + std::lock_guard lock(DiskCubeFS::reservation_mutex); + disk->reserved_bytes -= size; + size = new_size; + disk->reserved_bytes += size; + } + + ~DiskCubeFSReservation() override + { + try + { + std::lock_guard lock(DiskCubeFS::reservation_mutex); + if (disk->reserved_bytes < size) + { + disk->reserved_bytes = 0; + LOG_ERROR(&Poco::Logger::get("DiskCubeFS"), "Unbalanced reservations size for disk '{}'.", disk->getName()); + } + else + { + disk->reserved_bytes -= size; + } + + if (disk->reservation_count == 0) + LOG_ERROR(&Poco::Logger::get("DiskCubeFS"), "Unbalanced reservation count for disk '{}'.", disk->getName()); + else + --disk->reservation_count; + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + +private: + DiskCubeFSPtr disk; + UInt64 size; + CurrentMetrics::Increment metric_increment; +}; + + +ReservationPtr DiskCubeFS::reserve(UInt64 bytes) +{ + LOG_DEBUG(logger, "Got reserve request, bytes: {}", bytes); + if (!tryReserve(bytes)) + return {}; + return std::make_unique(std::static_pointer_cast(shared_from_this()), bytes); +} + +bool DiskCubeFS::tryReserve(UInt64 bytes) +{ + std::lock_guard lock(DiskCubeFS::reservation_mutex); + if (bytes == 0) + { + LOG_DEBUG(logger, "Reserving 0 bytes on disk {}", backQuote(name)); + ++reservation_count; + return true; + } + + auto available_space = getTotalSpace(); + UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes); + if (unreserved_space >= bytes) + { + LOG_DEBUG( + logger, "Reserving {} on disk {}, having unreserved {}.", ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space)); + ++reservation_count; + reserved_bytes += bytes; + return true; + } + LOG_DEBUG(logger, "DiskCubeFS::tryReserve return false"); + return false; +} + +UInt64 DiskCubeFS::getTotalSpace() const +{ + LOG_DEBUG(logger, "Use mock getTotalSpace function"); + return 419585155072; + //throwFromErrnoWithPath("DiskCubeFs does not support function getTotalSpace!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +UInt64 DiskCubeFS::getAvailableSpace() const +{ + LOG_DEBUG(logger, "Use mock getAvailableSpace function"); + return 151697231872; + //throwFromErrnoWithPath("DiskCubeFs does not support function getAvailableSpace!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +UInt64 DiskCubeFS::getUnreservedSpace() const +{ + LOG_DEBUG(logger, "Use mock getUnreservedSpace function"); + return 151697231872; + //throwFromErrnoWithPath("DiskCubeFs does not support function getUnreservedSpace!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +bool DiskCubeFS::isFile(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + if (S_ISREG(stat.mode)) + return true; + + return false; +} + +bool DiskCubeFS::isDirectory(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + if (S_ISDIR(stat.mode)) + return true; + + return false; +} + +size_t DiskCubeFS::getFileSize(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + return stat.size; +} + +cfs_stat_info DiskCubeFS::getFileAttributes(const String & relative_path) const +{ + cfs_stat_info stat; + fs::path full_path = fs::path(settings->disk_path) / relative_path; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.c_str()), &stat); + if (result != 0) + { + int error_code = errno; + std::string error_message = strerror(error_code); + std::string full_error_message = "Failed to get file attribute: " + full_path.string() + ": " + error_message; + throwFromErrnoWithPath(full_error_message, full_path, ErrorCodes::CANNOT_STATVFS); + } + return stat; +} + +fs::path DiskCubeFS::getParentPath(const String & path) +{ + fs::path filePath(path); + fs::path parentPath = filePath.parent_path(); + + if (path.back() == '/' && parentPath != filePath) + { + parentPath = parentPath.parent_path(); + } + return parentPath; +} + +void DiskCubeFS::createDirectory(const String & path) +{ + fs::path parent_dir = getParentPath(path); + if (!exists(parent_dir)) + { + throwFromErrnoWithPath( + "Parent directory does not exist [" + parent_dir.string() + "]", parent_dir, ErrorCodes::DIRECTORY_DOESNT_EXIST); + } + fs::path full_path = fs::path(settings->disk_path) / path; + createDirectories(path); +} + +void DiskCubeFS::createDirectories(const String & path) +{ + fs::path full_path = fs::path(settings->disk_path) / path; + int result = sdk_loader->cfsMkdirs(id, const_cast(full_path.string().c_str()), O_RDWR | O_CREAT); + if (result != 0) + { + throwFromErrnoWithPath("Cannot mkdir: " + full_path.string(), full_path, ErrorCodes::CANNOT_CREATE_DIRECTORY); + } +} + +void DiskCubeFS::clearDirectory(const String & path) +{ + std::vector file_names; + listFiles(path, file_names); + for (const auto & filename : file_names) + { + fs::path file_path = fs::path(settings->disk_path) / filename; + + int result = sdk_loader->cfsUnlink(id, const_cast(file_path.string().c_str())); + if (result < 0) + { + throwFromErrnoWithPath("Cannot unlink file: " + file_path.string(), file_path, ErrorCodes::CANNOT_UNLINK); + } + } +} + +void DiskCubeFS::listFiles(const String & path, std::vector & file_names) +{ + file_names.clear(); + fs::path full_path = fs::path(settings->disk_path) / path; + int fd = -1; + fd = sdk_loader->cfsOpen(id, const_cast(full_path.string().c_str()), 0, 0); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + full_path.string(), full_path, ErrorCodes::CANNOT_OPEN_FILE); + } + int count = default_readdir_count; + while (true) + { + std::vector direntsInfo(count); + std::memset(direntsInfo.data(), 0, count * sizeof(cfs_dirent_info)); + int num_entries = sdk_loader->cfsReaddir(id, fd, {direntsInfo.data(), count, count}, count); + if (num_entries < 0) + { + throwFromErrnoWithPath("Cannot readdir: " + full_path.string(), full_path, ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + if (num_entries < count) + { + for (int i = 0; i < num_entries; i++) + { + std::string file_name(direntsInfo[i].name); + file_names.emplace_back(file_name); + } + sdk_loader->cfsClose(id, fd); + return; + } + count = count * 2; + } +} + + +void DiskCubeFS::removeFile(const String & path) +{ + fs::path full_path = (fs::path(settings->disk_path) / path); + if (!fileExists(path)) + { + throwFromErrnoWithPath("File does not exists: " + full_path.string(), full_path, ErrorCodes::FILE_DOESNT_EXIST); + } + int result = sdk_loader->cfsUnlink(id, const_cast(full_path.c_str())); + if (result != 0) + { + throwFromErrnoWithPath("Cannot unlink file: " + full_path.string(), full_path, ErrorCodes::CANNOT_UNLINK); + } +} + +void DiskCubeFS::moveDirectory(const String & from_path, const String & to_path) +{ + moveFile(from_path, to_path); +} + +class DiskCubeFSDirectoryIterator final : public IDiskDirectoryIterator +{ +public: + DiskCubeFSDirectoryIterator() : id(-1), fd(-1), current_index(0) { } + DiskCubeFSDirectoryIterator(int64_t client_id, const String & path, std::shared_ptr sdk_loader_) + : id(client_id), dir_path(path), fd(-1), current_index(0), sdk_loader(sdk_loader_) + { + openDirectory(); + } + ~DiskCubeFSDirectoryIterator() override { closeDirectory(); } + void next() override { ++current_index; } + bool isValid() const override { return current_index < dirents.size(); } + String path() const override + { + if (isValid()) + return (fs::path(dir_path) / dirents[current_index]).string(); + else + return ""; + } + String name() const override + { + if (isValid()) + return dirents[current_index]; + else + return ""; + } + +private: + int64_t id; + String dir_path; + int fd; + std::vector dirents; + size_t current_index; + std::shared_ptr sdk_loader; + void openDirectory() + { + fd = sdk_loader->cfsOpen(id, const_cast(dir_path.c_str()), O_RDONLY | O_DIRECTORY, 0755); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + dir_path, fs::path(dir_path), ErrorCodes::CANNOT_OPEN_FILE); + } + int count = default_readdir_count; + while (true) + { + std::vector direntsInfo(count); + std::memset(direntsInfo.data(), 0, count * sizeof(cfs_dirent_info)); + int num_entries = sdk_loader->cfsReaddir(id, fd, {direntsInfo.data(), count, count}, count); + if (num_entries < 0) + { + throwFromErrnoWithPath("Cannot readdir: " + dir_path, fs::path(dir_path), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + if (num_entries < count) + { + for (int i = 0; i < num_entries; i++) + { + std::string file_name(direntsInfo[i].name); + dirents.emplace_back(file_name); + } + return; + } + count = count * 2; + } + } + void closeDirectory() + { + if (fd > 0) + sdk_loader->cfsClose(id, fd); + } +}; + +DiskDirectoryIteratorPtr DiskCubeFS::iterateDirectory(const String & path) +{ + fs::path meta_path = fs::path(settings->disk_path) / path; + if (!broken && directoryExists(meta_path)) + return std::make_unique(id, meta_path.string(), sdk_loader); + else + return std::make_unique(); +} + +void DiskCubeFS::createFile(const String & path) +{ + fs::path parent_dir = getParentPath(path); + if (!exists(parent_dir)) + { + throwFromErrnoWithPath( + "Parent directory does not exist [" + parent_dir.string() + "]", parent_dir, ErrorCodes::DIRECTORY_DOESNT_EXIST); + } + fs::path full_path = fs::path(settings->disk_path) / path; + int fd = sdk_loader->cfsOpen( + id, + const_cast(full_path.string().c_str()), + O_WRONLY | O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + full_path.string(), full_path, ErrorCodes::CANNOT_OPEN_FILE); + } + sdk_loader->cfsClose(id, fd); +} + +void DiskCubeFS::replaceFile(const String & from_path, const String & to_path) +{ + fs::path parent_dir = getParentPath(to_path); + if (!exists(parent_dir)) + { + throwFromErrnoWithPath( + "Destination directory does not exist [" + parent_dir.string() + "]", parent_dir, ErrorCodes::DIRECTORY_DOESNT_EXIST); + } + fs::path full_from_path = fs::path(settings->disk_path) / from_path; + fs::path full_to_path = fs::path(settings->disk_path) / to_path; + int result + = sdk_loader->cfsRename(id, const_cast(full_from_path.string().c_str()), const_cast(full_to_path.string().c_str())); + if (result != 0) + { + throwFromErrnoWithPath( + "Failed to rename file, from path: " + full_from_path.string() + " to path: " + full_to_path.string(), + "", + ErrorCodes::ATOMIC_RENAME_FAIL); + } +} + +void DiskCubeFS::moveFile(const String & from_path, const String & to_path) +{ + if (exists(to_path)) + { + throwFromErrnoWithPath( + "Destination file already exists [" + (fs::path(settings->disk_path) / to_path).string() + "]", + fs::path(settings->disk_path) / to_path, + ErrorCodes::FILE_ALREADY_EXISTS); + } + replaceFile(from_path, to_path); +} + +void DiskCubeFS::removeFileIfExists(const String & path) +{ + if (fileExists(path)) + { + auto fs_path = fs::path(settings->disk_path) / path; + if (0 != sdk_loader->cfsUnlink(id, const_cast(fs_path.c_str()))) + throwFromErrnoWithPath("Cannot unlink file " + fs_path.string(), fs_path, ErrorCodes::CANNOT_UNLINK); + } +} + +void DiskCubeFS::removeDirectory(const String & path) +{ + auto fs_path = fs::path(settings->disk_path) / path; + if (0 != sdk_loader->cfsRmdir(id, const_cast(fs_path.c_str()))) + { + int error_code = errno; + std::string error_message = strerror(error_code); + std::string full_error_message = "Cannot rmdir " + fs_path.string() + ": " + error_message; + throwFromErrnoWithPath(full_error_message, fs_path, ErrorCodes::CANNOT_RMDIR); + } +} + +void DiskCubeFS::removeRecursive(const String & path) +{ + cfs_stat_info stat = getFileAttributes(path); + if (S_ISDIR(stat.mode)) + { + std::vector file_names; + listFiles(path, file_names); + for (const auto & filename : file_names) + { + auto child_path = fs::path(path) / filename; + removeRecursive(child_path.string()); + } + removeDirectory(path); + } + else + { + removeFile(path); + } +} + +//warning: The file stat information is cached locally and may not be updated immediately +void DiskCubeFS::setLastModified(const String & path, const Poco::Timestamp & timestamp) +{ + constexpr uint32_t AttrModifyTime = 1 << 3; + constexpr uint32_t AttrAccessTime = 1 << 4; + cfs_stat_info stat = getFileAttributes(path); + stat.atime = timestamp.epochTime(); + stat.mtime = timestamp.epochTime(); + fs::path full_path = fs::path(settings->disk_path) / path; + if (sdk_loader->cfsSetattr(id, const_cast(full_path.string().c_str()), &stat, AttrModifyTime | AttrAccessTime) != 0) + { + throwFromErrnoWithPath("Cannot setattr " + full_path.string(), full_path, ErrorCodes::LOGICAL_ERROR); + } +} + +Poco::Timestamp DiskCubeFS::getLastModified(const String & path) +{ + return Poco::Timestamp::fromEpochTime(getLastChanged(path)); +} + +time_t DiskCubeFS::getLastChanged(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + return stat.ctime; +} + +void DiskCubeFS::setReadOnly(const String & path) +{ + cfs_stat_info stat = getFileAttributes(path); + fs::path full_path = fs::path(settings->disk_path) / path; + int fd = sdk_loader->cfsOpen(id, const_cast(full_path.string().c_str()), 0, 0); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + full_path.string(), full_path, ErrorCodes::CANNOT_OPEN_FILE); + } + mode_t newMode = stat.mode & ~(S_IWUSR | S_IWGRP | S_IWOTH); + if (sdk_loader->cfsFchmod(id, fd, newMode) != 0) + { + throwFromErrnoWithPath("Cannot fchmod " + full_path.string(), full_path, ErrorCodes::LOGICAL_ERROR); + } +} + +void DiskCubeFS::createHardLink(const String &, const String &) +{ + throwFromErrnoWithPath("DiskCubeFs does not support function createHardLink!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +DiskCubeFS::DiskCubeFS(const String & name_, SettingsPtr settings_) + : name(name_), settings(std::move(settings_)), logger(&Poco::Logger::get("DiskCubeFS")) +{ + sdk_loader = std::make_shared(settings->lib_path); + id = sdk_loader->cfsNewClient(); + if (id <= 0) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to create client"); + } + setClientInfo("volName", const_cast(settings->vol_name.data())); + setClientInfo("masterAddr", const_cast(settings->master_addr.data())); + setClientInfo("logDir", const_cast(settings->log_dir.data())); + setClientInfo("logLevel", const_cast(settings->log_level.data())); + setClientInfo("accessKey", const_cast(settings->access_key.data())); + setClientInfo("secretKey", const_cast(settings->secret_key.data())); + setClientInfo("pushAddr", const_cast(settings->push_addr.data())); + if (sdk_loader->cfsStartClient(id) != 0) + { + throwFromErrnoWithPath("Start cfs client failed", "", ErrorCodes::LOGICAL_ERROR); + } + createDirectories(""); +} + +DiskCubeFS::DiskCubeFS(const String & name_, ContextPtr, SettingsPtr settings_) : DiskCubeFS(name_, settings_) +{ +} + +void DiskCubeFS::getClientId() +{ + id = sdk_loader->cfsNewClient(); + if (id <= 0) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to create client"); + } +} + +void DiskCubeFS::setClientInfo(const char * key, char * value) +{ + if (sdk_loader->cfsSetClient(id, strdup(key), value) != 0) + { + sdk_loader->cfsCloseClient(id); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set client info"); + } +} + +DiskCubeFSSettings ::DiskCubeFSSettings( + String vol_name_, + String master_addr_, + String log_dir_, + String log_level_, + String access_key_, + String secret_key_, + String push_addr_, + String disk_path_, + String lib_path_) + : vol_name(vol_name_) + , master_addr(master_addr_) + , log_dir(log_dir_) + , log_level(log_level_) + , access_key(access_key_) + , secret_key(secret_key_) + , push_addr(push_addr_) + , disk_path(disk_path_) + , lib_path(lib_path_) +{ +} + +bool DiskCubeFS::canRead(const std::string & path) +{ + cfs_stat_info stat = getFileAttributes(path); + if (stat.uid == geteuid()) + return (stat.mode & S_IRUSR) != 0; + else if (stat.gid == getegid()) + return (stat.mode & S_IRGRP) != 0; + else + return (stat.mode & S_IROTH) != 0 || geteuid() == 0; +} + +bool DiskCubeFS::exists(const String & path) const +{ + fs::path full_path = fs::path(settings->disk_path) / path; + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.string().c_str()), &stat); + return (result == 0); +} + +bool DiskCubeFS::fileExists(const String & path) const +{ + fs::path full_path = fs::path(settings->disk_path) / path; + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.string().c_str()), &stat); + return (result == 0) && (S_ISREG(stat.mode)); +} + +bool DiskCubeFS::directoryExists(const String & path) const +{ + fs::path full_path = fs::path(settings->disk_path) / path; + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.string().c_str()), &stat); + return (result == 0) && (S_ISDIR(stat.mode)); +} + +std::optional DiskCubeFS::fileSizeSafe(const fs::path & path) +{ + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(path.c_str()), &stat); + if (result == 0) + { + return stat.size; + } + else if (result == -ENOENT || result == -ENOTDIR) + { + return std::nullopt; + } + else + { + throw std::runtime_error("Failed to get file size: " + path.string()); + } +} + +std::unique_ptr +DiskCubeFS::readFile(const String & path, const ReadSettings &, std::optional, std::optional) const +{ + if (!fileExists(path)) + { + fs::path full_path = fs::path(settings->disk_path) / path; + throwFromErrnoWithPath("File does not exists: " + full_path.string(), full_path, ErrorCodes::FILE_DOESNT_EXIST); + } + return std::make_unique(id, fs::path(settings->disk_path) / path, sdk_loader, O_RDONLY | O_CLOEXEC); +} + +// std::optional DiskCubeFS::readDiskCheckerMagicNumber() const noexcept +// { +// try +// { +// auto buf = readFile(disk_checker_path); +// UInt32 magic_number; +// readIntBinary(magic_number, *buf); +// if (buf->eof()) +// return magic_number; +// LOG_WARNING(logger, "The size of disk check magic number is more than 4 bytes. Mark it as read failure"); +// return {}; +// } +// catch (...) +// { +// tryLogCurrentException( +// logger, fmt::format("Cannot read correct disk check magic number from from {}{}", settings->disk_path, disk_checker_path)); +// return {}; +// } +// } + +std::unique_ptr DiskCubeFS::writeFile(const String & path, size_t buf_size, WriteMode mode) +{ + int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1; + return std::make_unique(id, fs::path(settings->disk_path) / path, sdk_loader, buf_size, flags); +} +} diff --git a/src/Disks/CubeFS/DiskCubeFS.h b/src/Disks/CubeFS/DiskCubeFS.h new file mode 100644 index 000000000000..bcdc239b7a11 --- /dev/null +++ b/src/Disks/CubeFS/DiskCubeFS.h @@ -0,0 +1,110 @@ +#pragma once + +#include +#include "sdkLoader.h" + +namespace DB +{ +struct DiskCubeFSSettings +{ + DiskCubeFSSettings( + String vol_name_, + String master_addr_, + String log_dir_, + String log_level_, + String access_key_, + String secret_key_, + String push_addr_, + String disk_path_, + String lib_path_); + String vol_name; + String master_addr; + String log_dir; + String log_level; + String access_key; + String secret_key; + String push_addr; + String disk_path; + String lib_path; +}; + +class DiskCubeFS final : public IDisk +{ +public: + friend class DiskCubeFSReservation; + using SettingsPtr = std::shared_ptr; + + DiskCubeFS(const String & name_, SettingsPtr settings_); + DiskCubeFS(const String & name_, ContextPtr context, SettingsPtr settings_); + std::unique_ptr + readFile(const String & path, const ReadSettings & settings, std::optional read_hint, std::optional file_size) + const override; + std::unique_ptr writeFile(const String & path, size_t buf_size, WriteMode mode) override; + bool exists(const String & path) const override; + const String & getName() const override { return name; } + const String & getPath() const override { return settings->disk_path; } + ReservationPtr reserve(UInt64 bytes) override; + UInt64 getTotalSpace() const override; + UInt64 getUnreservedSpace() const override; + bool isFile(const String & path) const override; + bool isDirectory(const String & path) const override; + size_t getFileSize(const String & path) const override; + //differs from createDirectories, an error will be reported if the parent directory does not exist. + void createDirectory(const String & path) override; + void createDirectories(const String & path) override; + /* + Delete all files under the folder + Empty subfolders can be deleted, but if there are files in the subfolders, an error will be reported.. + */ + void clearDirectory(const String & path) override; + /* + Follow fs::rename semantics. If the upper-level directory on the destination does not exist, an error will be reported. + By modifying the cubefs code, the cfs_rename method supports overwriting the destination file. + */ + void moveDirectory(const String & from_path, const String & to_path) override; + //differs from replaceFile, will not overwrite the destination file + void moveFile(const String & from_path, const String & to_path) override; + void replaceFile(const String & from_path, const String & to_path) override; + DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + //If the parent directory does not exist, an error will be reported. + void createFile(const String & path) override; + void listFiles(const String & path, std::vector & file_names) override; + void removeFile(const String & path) override; + void removeFileIfExists(const String & path) override; + void removeDirectory(const String & path) override; + void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; + Poco::Timestamp getLastModified(const String & path) override; + time_t getLastChanged(const String & path) const override; + void setReadOnly(const String & path) override; + void createHardLink(const String & src_path, const String & dst_path) override; + DiskType getType() const override { return DiskType::CubeFS; } + bool isRemote() const override { return true; } + bool supportZeroCopyReplication() const override { return false; } + void removeRecursive(const String & path) override; + UInt64 getAvailableSpace() const override; + +private: + bool fileExists(const String & path) const; + bool directoryExists(const String & path) const; + cfs_stat_info getFileAttributes(const String & relative_path) const; + bool canRead(const std::string & path); + std::optional fileSizeSafe(const fs::path & path); + bool tryReserve(UInt64 bytes); + void getClientId(); + void setClientInfo(const char * key, char * value); + fs::path getParentPath(const String & path); + + const String name; + SettingsPtr settings; + Poco::Logger * logger; + static std::mutex reservation_mutex; + //std::atomic keep_free_space_bytes; + std::atomic broken{false}; + std::atomic readonly{false}; + UInt64 reserved_bytes = 0; + //const String disk_checker_path = ".disk_checker_file"; + UInt64 reservation_count = 0; + int64_t id; + std::shared_ptr sdk_loader; +}; +} diff --git a/src/Disks/CubeFS/registerDiskCubeFS.cpp b/src/Disks/CubeFS/registerDiskCubeFS.cpp new file mode 100644 index 000000000000..c330338bd24b --- /dev/null +++ b/src/Disks/CubeFS/registerDiskCubeFS.cpp @@ -0,0 +1,49 @@ +#include +#include "DiskCubeFS.h" +#include "Disks/DiskFactory.h" +#include "Disks/DiskRestartProxy.h" +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + + DiskCubeFS::SettingsPtr getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix) + { + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "volumn name: {}", config.getString(config_prefix + ".vol_name")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "master address: {}", config.getString(config_prefix + ".master_addr")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "log directory: {}", config.getString(config_prefix + ".log_dir")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "log level: {}", config.getString(config_prefix + ".log_level")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "access key: {}", config.getString(config_prefix + ".access_key")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "secret key: {}", config.getString(config_prefix + ".secret_key")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "push address: {}", config.getString(config_prefix + ".push_addr")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "disk path: {}", config.getString(config_prefix + ".disk_path")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "lib path: {}", config.getString(config_prefix + ".lib_path")); + return std::make_shared( + config.getString(config_prefix + ".vol_name"), + config.getString(config_prefix + ".master_addr"), + config.getString(config_prefix + ".log_dir"), + config.getString(config_prefix + ".log_level", "debug"), + config.getString(config_prefix + ".access_key"), + config.getString(config_prefix + ".secret_key"), + config.getString(config_prefix + ".push_addr"), + config.getString(config_prefix + ".disk_path"), + config.getString(config_prefix + ".lib_path")); + } + +void registerDiskCubeFS(DiskFactory & factory) +{ + auto creator = [](const String & name, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + ContextPtr context, + const DisksMap & /*map*/) -> DiskPtr + { + auto settings = getSettings(config, config_prefix); + std::shared_ptr cubeFSdisk = std::make_shared(name, context, settings); + return std::make_shared(cubeFSdisk); + }; + factory.registerDiskType("cubefs", creator); +} +} diff --git a/src/Disks/CubeFS/sdkLoader.cpp b/src/Disks/CubeFS/sdkLoader.cpp new file mode 100644 index 000000000000..188f47e81d4a --- /dev/null +++ b/src/Disks/CubeFS/sdkLoader.cpp @@ -0,0 +1,48 @@ +#include "sdkLoader.h" +#include +#include +namespace DB +{ +namespace ErrorCodes +{ + extern const int EXTERNAL_LIBRARY_ERROR; +} +SdkLoader::SdkLoader(String lib_path_) : lib_path(lib_path_), logger(&Poco::Logger::get("sdkLoader")) +{ + LOG_DEBUG(logger, "load cubefs shared library from path: {}", lib_path_); + void * handle = dlopen(lib_path_.c_str(), RTLD_LAZY); + if (!handle) + { + throwFromErrnoWithPath("Failed to open the dynamic library", lib_path_, ErrorCodes::EXTERNAL_LIBRARY_ERROR); + } + cfsNewClient = reinterpret_cast(dlsym(handle, "cfs_new_client")); + cfsSetClient = reinterpret_cast(dlsym(handle, "cfs_set_client")); + cfsStartClient = reinterpret_cast(dlsym(handle, "cfs_start_client")); + cfsCloseClient = reinterpret_cast(dlsym(handle, "cfs_close_client")); + cfsChdir = reinterpret_cast(dlsym(handle, "cfs_chdir")); + cfsGetcwd = reinterpret_cast(dlsym(handle, "cfs_getcwd")); + cfsGetattr = reinterpret_cast(dlsym(handle, "cfs_getattr")); + cfsSetattr = reinterpret_cast(dlsym(handle, "cfs_setattr")); + cfsOpen = reinterpret_cast(dlsym(handle, "cfs_open")); + cfsFlush = reinterpret_cast(dlsym(handle, "cfs_flush")); + cfsClose = reinterpret_cast(dlsym(handle, "cfs_close")); + cfsLsdir = reinterpret_cast(dlsym(handle, "cfs_lsdir")); + cfsMkdirs = reinterpret_cast(dlsym(handle, "cfs_mkdirs")); + cfsRmdir = reinterpret_cast(dlsym(handle, "cfs_rmdir")); + cfsUnlink = reinterpret_cast(dlsym(handle, "cfs_unlink")); + cfsRename = reinterpret_cast(dlsym(handle, "cfs_rename")); + cfsFchmod = reinterpret_cast(dlsym(handle, "cfs_fchmod")); + cfsGetsummary = reinterpret_cast(dlsym(handle, "cfs_getsummary")); + cfsWrite = reinterpret_cast(dlsym(handle, "cfs_write")); + cfsRead = reinterpret_cast(dlsym(handle, "cfs_read")); + cfsReaddir = reinterpret_cast(dlsym(handle, "cfs_readdir")); + + if (!cfsNewClient || !cfsSetClient || !cfsStartClient || !cfsCloseClient || !cfsChdir || !cfsGetcwd || !cfsGetattr || !cfsSetattr + || !cfsOpen || !cfsFlush || !cfsClose || !cfsWrite || !cfsRead || !cfsLsdir || !cfsMkdirs || !cfsRmdir || !cfsUnlink || !cfsRename + || !cfsFchmod || !cfsGetsummary || !cfsReaddir) + { + dlclose(handle); + throwFromErrnoWithPath("Failed to find one or more functions", "", ErrorCodes::EXTERNAL_LIBRARY_ERROR); + } +} +} diff --git a/src/Disks/CubeFS/sdkLoader.h b/src/Disks/CubeFS/sdkLoader.h new file mode 100644 index 000000000000..77c35527fe1b --- /dev/null +++ b/src/Disks/CubeFS/sdkLoader.h @@ -0,0 +1,63 @@ +#pragma once +#include +#include +#include + +namespace DB +{ +class SdkLoader +{ +public: + SdkLoader(String lib_path_); + + typedef int64_t (*CfsNewClientFunction)(); + typedef int (*CfsSetClientFunction)(int64_t, char *, char *); + typedef int (*CfsStartClientFunction)(int64_t); + typedef void (*CfsCloseClientFunction)(int64_t); + typedef int (*CfsChdirFunction)(int64_t, char *); + typedef char * (*CfsGetcwdFunction)(int64_t); + typedef int (*CfsGetattrFunction)(int64_t, char *, struct cfs_stat_info *); + typedef int (*CfsSetattrFunction)(int64_t, char *, struct cfs_stat_info *, int); + typedef int (*CfsOpenFunction)(int64_t, char *, int, mode_t); + typedef int (*CfsFlushFunction)(int64_t, int); + typedef void (*CfsCloseFunction)(int64_t, int); + typedef ssize_t (*CfsWriteFunction)(int64_t, int, void *, size_t, off_t); + typedef ssize_t (*CfsReadFunction)(int64_t, int, void *, size_t, off_t); + //typedef int (*CfsBatchGetInodesFunction)(int64_t, int, void *, GoSlice, int); + //typedef int (*CfsRefreshSummaryFunction)(int64_t, char *, int); + typedef int (*CfsReaddirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsLsdirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsMkdirsFunction)(int64_t, char *, mode_t); + typedef int (*CfsRmdirFunction)(int64_t, char *); + typedef int (*CfsUnlinkFunction)(int64_t, char *); + typedef int (*CfsRenameFunction)(int64_t, char *, char *); + typedef int (*CfsFchmodFunction)(int64_t, int, mode_t); + typedef int (*CfsGetsummaryFunction)(int64_t, char *, struct cfs_summary_info *, char *, int); + + CfsNewClientFunction cfsNewClient; + CfsSetClientFunction cfsSetClient; + CfsStartClientFunction cfsStartClient; + CfsCloseClientFunction cfsCloseClient; + CfsChdirFunction cfsChdir; + CfsGetcwdFunction cfsGetcwd; + CfsGetattrFunction cfsGetattr; + CfsSetattrFunction cfsSetattr; + CfsOpenFunction cfsOpen; + CfsFlushFunction cfsFlush; + CfsCloseFunction cfsClose; + CfsLsdirFunction cfsLsdir; + CfsMkdirsFunction cfsMkdirs; + CfsRmdirFunction cfsRmdir; + CfsUnlinkFunction cfsUnlink; + CfsRenameFunction cfsRename; + CfsFchmodFunction cfsFchmod; + CfsGetsummaryFunction cfsGetsummary; + CfsWriteFunction cfsWrite; + CfsReadFunction cfsRead; + CfsReaddirFunction cfsReaddir; + +private: + String lib_path; + Poco::Logger * logger; +}; +} diff --git a/src/Disks/DiskType.h b/src/Disks/DiskType.h index 435f427b05a0..05b4f0f46703 100644 --- a/src/Disks/DiskType.h +++ b/src/Disks/DiskType.h @@ -14,6 +14,7 @@ enum class DiskType Encrypted, WebServer, AzureBlobStorage, + CubeFS, }; inline String toString(DiskType disk_type) @@ -34,6 +35,8 @@ inline String toString(DiskType disk_type) return "web"; case DiskType::AzureBlobStorage: return "azure_blob_storage"; + case DiskType::CubeFS: + return "cubefs"; } __builtin_unreachable(); } diff --git a/src/Disks/registerDisks.cpp b/src/Disks/registerDisks.cpp index 88c3fdde1e06..47843ed943f4 100644 --- a/src/Disks/registerDisks.cpp +++ b/src/Disks/registerDisks.cpp @@ -9,7 +9,7 @@ namespace DB void registerDiskLocal(DiskFactory & factory); void registerDiskMemory(DiskFactory & factory); - +void registerDiskCubeFS(DiskFactory & factory); #if USE_AWS_S3 void registerDiskS3(DiskFactory & factory); #endif @@ -35,6 +35,7 @@ void registerDisks() registerDiskLocal(factory); registerDiskMemory(factory); + registerDiskCubeFS(factory); #if USE_AWS_S3 registerDiskS3(factory); diff --git a/src/Disks/tests/gtest_disk_cubefs.cpp b/src/Disks/tests/gtest_disk_cubefs.cpp new file mode 100644 index 000000000000..7b5dfb0e1481 --- /dev/null +++ b/src/Disks/tests/gtest_disk_cubefs.cpp @@ -0,0 +1,256 @@ +#include +#include +#include +#include +#include +#define RUN_CUBEFS_TEST 1 +#if RUN_CUBEFS_TEST + +# include +# include +# include +# include +# include +# include + +using namespace DB; + +const String path = "/test_dir"; +const String vol_name = "xieyichen"; +const String master_addr = "cfs-south.oppo.local"; +const String log_dir = "/home/xieyichen/logs/cfs2/test-log"; +const String log_level = "debug"; +const String access_key = "jRlZO65q7XlH5bnV"; +const String secret_key = "V1m730UzREHaK1jCkC0kL0cewOX0kH3K"; +const String push_addr = "cfs.dg-push.wanyol.com"; +const String disk_name = "cubefs"; +const String lib_path = "/home/xieyichen/ClickHouse/libcfs.so"; + +class DiskTestCubeFS : public testing::Test +{ +public: + void SetUp() override + { + auto settings = std::make_shared( + vol_name, master_addr, log_dir, log_level, access_key, secret_key, push_addr, path, lib_path); + std::cout << "set client info successfully " << std::endl; + disk = std::make_shared(disk_name, settings); + std::cout << "initial cubefs disk successfully " << std::endl; + } + void TearDown() override + { + disk->removeRecursive(""); + disk.reset(); + } + DB::DiskPtr disk; +}; + +TEST_F(DiskTestCubeFS, createDirectory) +{ + EXPECT_TRUE(this->disk->exists("/")); + std::cout << "empty exist is ok" << std::endl; + EXPECT_EQ(disk_name, this->disk->getName()); + + disk->createDirectory("create_directory"); + EXPECT_TRUE(this->disk->exists("create_directory")); + EXPECT_TRUE(this->disk->isDirectory("create_directory")); + EXPECT_FALSE(this->disk->isFile("create_directory")); + + this->disk->createDirectories("not_exist_parent_directory/subdirectory"); + EXPECT_TRUE(this->disk->isDirectory("not_exist_parent_directory/subdirectory")); + std::cout << "class disk path: " << disk->getPath() << std::endl; +} + + +TEST_F(DiskTestCubeFS, moveDirectory) +{ + disk->createDirectory("create_directory"); + disk->createDirectories("not_exist_parent_directory/subdirectory"); + try + { + std::cout << "try move directory" << std::endl; + this->disk->moveDirectory("not_exist_parent_directory/subdirectory", "not_exist/create_directory/"); + FAIL() << "Expected exception to be thrown."; + //EXPECT_THROW(cubeFSdisk->moveDirectory("not_exist_parent_directory/subdirectory", "not_exist/create_directory/"), std::exception); + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "Destination directory does not exist"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + std::cout << "try move directory end" << std::endl; + this->disk->moveDirectory("not_exist_parent_directory/subdirectory", "create_directory/subdirectory"); + EXPECT_TRUE(this->disk->isDirectory("create_directory/subdirectory")); + std::cout << "move directory end" << std::endl; +} + +TEST_F(DiskTestCubeFS, createFile) +{ + try + { + disk->createFile("create_directory/file"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "Parent directory does not exist"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + disk->createDirectory("create_directory"); + + this->disk->createFile("create_directory/file"); + EXPECT_TRUE(this->disk->isFile("create_directory/file")); + EXPECT_FALSE(this->disk->isDirectory("create_directory/file")); + std::cout << "create file end" << std::endl; +} + +TEST_F(DiskTestCubeFS, moveFile) +{ + disk->createDirectory("create_directory"); + disk->createFile("create_directory/file"); + this->disk->moveDirectory("create_directory", "move_directory"); + EXPECT_TRUE(this->disk->isFile("move_directory/file")); + std::cout << "move directory end" << std::endl; + this->disk->moveFile("move_directory/file", "move_file"); + EXPECT_TRUE(this->disk->isFile("move_file")); + disk->createDirectory("create_directory"); + this->disk->createFile("create_directory/create_file"); + try + { + this->disk->moveFile("move_file", "create_directory/create_file"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "Destination file already exists"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + EXPECT_TRUE(disk->exists("create_directory/create_file")); + std::cout << "start replace file" << std::endl; + disk->replaceFile("move_file", "create_directory/create_file"); + EXPECT_TRUE(this->disk->isFile("create_directory/create_file")); +} + +TEST_F(DiskTestCubeFS, remove) +{ + try + { + disk->removeFile("not_exist"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "File does not exists"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + disk->removeFileIfExists("not_exist"); +} + +/* The file stat information is cached locally and may not be updated immediately +TEST_F(DiskTestCubeFS, lastModified) +{ + disk->createFile("file"); + std::this_thread::sleep_for(std::chrono::seconds(1)); + Poco::Timestamp timestamp; + disk->setLastModified("file", timestamp); + EXPECT_EQ(timestamp.epochTime(), disk->getLastModified("file").epochTime()); +} +*/ + +TEST_F(DiskTestCubeFS, writeFile) +{ + { + std::unique_ptr out = this->disk->writeFile("test_file"); + writeString("test data", *out); + } + + DB::String data; + { + std::unique_ptr in = this->disk->readFile("test_file"); + readString(data, *in); + } + + EXPECT_EQ("test data", data); + EXPECT_EQ(data.size(), this->disk->getFileSize("test_file")); +} + +TEST_F(DiskTestCubeFS, readFile) +{ + try + { + std::unique_ptr in = this->disk->readFile("test_file"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "File does not exists"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + + { + std::unique_ptr out = this->disk->writeFile("test_file"); + writeString("test data", *out); + } + + // Test SEEK_SET + { + String buf(4, '0'); + std::unique_ptr in = this->disk->readFile("test_file"); + + in->seek(5, SEEK_SET); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("data", buf); + } + + // Test SEEK_CUR + { + std::unique_ptr in = this->disk->readFile("test_file"); + String buf(4, '0'); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("test", buf); + + // Skip whitespace + in->seek(1, SEEK_CUR); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("data", buf); + } +} + +TEST_F(DiskTestCubeFS, iterateDirectory) +{ + this->disk->createDirectories("test_dir/nested_dir/"); + + { + auto iter = this->disk->iterateDirectory(""); + EXPECT_TRUE(iter->isValid()); + EXPECT_EQ("/test_dir/test_dir", iter->path()); + iter->next(); + EXPECT_FALSE(iter->isValid()); + } + + { + auto iter = this->disk->iterateDirectory("test_dir/"); + EXPECT_TRUE(iter->isValid()); + EXPECT_EQ("/test_dir/test_dir/nested_dir", iter->path()); + iter->next(); + EXPECT_FALSE(iter->isValid()); + } +} + +TEST_F(DiskTestCubeFS, others) +{ + std::cout << "getAvailableSpace result: " << disk->getAvailableSpace() << std::endl; + disk->createFile("file"); + disk->setReadOnly("file"); + std::cout << "getLastChanged result: " << disk->getLastChanged("file") << std::endl; +} + +#endif diff --git a/src/IO/ReadBufferFromCubeFS.cpp b/src/IO/ReadBufferFromCubeFS.cpp new file mode 100644 index 000000000000..07996fc4cbcb --- /dev/null +++ b/src/IO/ReadBufferFromCubeFS.cpp @@ -0,0 +1,195 @@ +#include +namespace ProfileEvents +{ +extern const Event ReadBufferFromFileDescriptorRead; +extern const Event ReadBufferFromFileDescriptorReadFailed; +extern const Event ReadBufferFromFileDescriptorReadBytes; +extern const Event DiskReadElapsedMicroseconds; +extern const Event Seek; +} + +namespace DB +{ +namespace ErrorCodes +{ + extern const int FILE_DOESNT_EXIST; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_CLOSE_FILE; +} + +ReadBufferFromCubeFS::~ReadBufferFromCubeFS() +{ + if (fd < 0) + return; + + sdk_loader->cfsClose(id, fd); +} + +void ReadBufferFromCubeFS::close() +{ + if (fd < 0) + return; + try + { + sdk_loader->cfsClose(id, fd); + } + catch (...) + { + throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE); + } + fd = -1; + //metric_increment.destroy(); +} + +off_t ReadBufferFromCubeFS::size() +{ + struct cfs_stat_info file_info; + int result = sdk_loader->cfsGetattr(id, const_cast(file_name.data()), &file_info); + if (result != 0) + { + // Handle the error (throw an exception, return an error code, etc.) + throwFromErrnoWithPath("Cannot execute fstat " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSTAT); + } + return file_info.size; +} + +bool ReadBufferFromCubeFS::nextImpl() +{ + /// If internal_buffer size is empty, then read() cannot be distinguished from EOF + assert(!internal_buffer.empty()); + /// This is a workaround of a read pass EOF bug in linux kernel with pread() + if (file_size.has_value() && file_offset_of_buffer_end >= *file_size) + return false; + size_t bytes_read = 0; + while (!bytes_read) + { + //ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead); + + //Stopwatch watch(profile_callback ? clock_type : CLOCK_MONOTONIC); + + ssize_t res = 0; + { + //CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; + res = sdk_loader->cfsRead(id, fd, internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end); + } + if (!res) + break; + + if (-1 == res && errno != EINTR) + { + //ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); + throwFromErrnoWithPath("Cannot read from file " + getFileName(), getFileName(), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + + if (res > 0) + bytes_read += res; + + /// It reports real time spent including the time spent while thread was preempted doing nothing. + /// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables). + /// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it + /// (TaskStatsInfoGetter has about 500K RPS). + //watch.stop(); + // ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); + + // if (profile_callback) + // { + // ProfileInfo info; + // info.bytes_requested = internal_buffer.size(); + // info.bytes_read = res; + // info.nanoseconds = watch.elapsed(); + // profile_callback(info); + // } + } + + file_offset_of_buffer_end += bytes_read; + + if (bytes_read) + { + //ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); + working_buffer = internal_buffer; + working_buffer.resize(bytes_read); + } + else + return false; + + return true; +} + +ReadBufferFromCubeFS::ReadBufferFromCubeFS( + int64_t id_, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + int flags, + char * existing_memory, + size_t buf_size, + size_t alignment, + std::optional file_size_) + : ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_), id(id_), file_name(file_name_), sdk_loader(sdk_loader_) +{ + //ProfileEvents::increment(ProfileEvents::FileOpen); + + fd = sdk_loader->cfsOpen( + id_, const_cast(file_name_.data()), flags == -1 ? O_RDONLY | O_CLOEXEC : flags | O_CLOEXEC, S_IRUSR | S_IWUSR); + + if (-1 == fd) + throwFromErrnoWithPath( + "Cannot open file " + file_name_, file_name_, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); +} + +/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. +off_t ReadBufferFromCubeFS::seek(off_t offset, int whence) +{ + size_t new_pos; + if (whence == SEEK_SET) + { + assert(offset >= 0); + new_pos = offset; + } + else if (whence == SEEK_CUR) + { + new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset; + } + else + { + throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + } + + /// Position is unchanged. + if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end) + return new_pos; + + if (file_offset_of_buffer_end - working_buffer.size() <= static_cast(new_pos) && new_pos <= file_offset_of_buffer_end) + { + /// Position is still inside the buffer. + /// Probably it is at the end of the buffer - then we will load data on the following 'next' call. + + pos = working_buffer.end() - file_offset_of_buffer_end + new_pos; + assert(pos >= working_buffer.begin()); + assert(pos <= working_buffer.end()); + + return new_pos; + } + else + { + /// Position is out of the buffer, we need to do real seek. + off_t seek_pos = new_pos; + + off_t offset_after_seek_pos = new_pos - seek_pos; + + /// First reset the buffer so the next read will fetch new data to the buffer. + resetWorkingBuffer(); + + /// In case of using 'pread' we just update the info about the next position in file. + /// In case of using 'read' we call 'lseek'. + + /// We account both cases as seek event as it leads to non-contiguous reads from file. + ProfileEvents::increment(ProfileEvents::Seek); + file_offset_of_buffer_end = seek_pos; + + if (offset_after_seek_pos > 0) + ignore(offset_after_seek_pos); + + return seek_pos; + } +} +} diff --git a/src/IO/ReadBufferFromCubeFS.h b/src/IO/ReadBufferFromCubeFS.h new file mode 100644 index 000000000000..8eb09d886408 --- /dev/null +++ b/src/IO/ReadBufferFromCubeFS.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; + extern const int ARGUMENT_OUT_OF_BOUND; + extern const int CANNOT_SEEK_THROUGH_FILE; + extern const int CANNOT_SELECT; + extern const int CANNOT_FSTAT; + extern const int CANNOT_ADVISE; +} +class ReadBufferFromCubeFS : public ReadBufferFromFileBase +{ +public: + explicit ReadBufferFromCubeFS( + int64_t id_, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + int flags = -1, + char * existing_memory = nullptr, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + size_t alignment = 0, + std::optional file_size_ = std::nullopt); + ~ReadBufferFromCubeFS() override; + std::string getFileName() const override { return file_name; } + Range getRemainingReadRange() const override { return Range{.left = file_offset_of_buffer_end, .right = std::nullopt}; } + size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; } + off_t getPosition() override { return file_offset_of_buffer_end - (working_buffer.end() - pos); } + void close(); + /** Could be used before initialization if needed 'fd' was not passed to constructor. + * It's not possible to change 'fd' during work. + */ + void setFD(int fd_) { fd = fd_; } + int getFD() const { return fd; } + off_t size(); + /// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. + off_t seek(off_t off, int whence) override; + +private: + bool nextImpl() override; + int fd; + int64_t id; + std::string file_name; + std::shared_ptr sdk_loader; + size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end(). +}; +} diff --git a/src/IO/WriteBufferFromCubeFS.cpp b/src/IO/WriteBufferFromCubeFS.cpp new file mode 100644 index 000000000000..ba63a2ddffaf --- /dev/null +++ b/src/IO/WriteBufferFromCubeFS.cpp @@ -0,0 +1,122 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int FILE_DOESNT_EXIST; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_CLOSE_FILE; + extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR; + extern const int CANNOT_FSYNC; + extern const int CANNOT_SEEK_THROUGH_FILE; + extern const int CANNOT_TRUNCATE_FILE; + extern const int CANNOT_FSTAT; +} + +WriteBufferFromCubeFS::WriteBufferFromCubeFS( + int64_t id_, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + size_t buf_size, + int flags, + mode_t mode, + char * existing_memory, + size_t alignment) + : WriteBufferFromFileBase(buf_size, existing_memory, alignment), id(id_), file_name(std::move(file_name_)), sdk_loader(sdk_loader_) +{ + //If O_APPEND is not added, cfsOpen will write from the specified offset position (0) each time. + fd = sdk_loader->cfsOpen( + id, + const_cast(file_name_.data()), + flags == -1 ? O_WRONLY | O_TRUNC | O_CREAT | O_CLOEXEC | O_APPEND : flags | O_CLOEXEC, + mode); + + if (-1 == fd) + throwFromErrnoWithPath( + "Cannot open file " + file_name, file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); +} + +WriteBufferFromCubeFS::~WriteBufferFromCubeFS() +{ + finalize(); + sdk_loader->cfsClose(id, fd); +} + +void WriteBufferFromCubeFS::close() +{ + if (fd < 0) + return; + next(); + sdk_loader->cfsClose(id, fd); + fd = -1; +} + +void WriteBufferFromCubeFS::finalizeImpl() +{ + if (fd < 0) + return; + + next(); +} + +void WriteBufferFromCubeFS::nextImpl() +{ + if (!offset()) + return; + + //Stopwatch watch; + + size_t bytes_written = 0; + while (bytes_written != offset()) + { + //ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite); + + ssize_t res = 0; + { + //CurrentMetrics::Increment metric_increment{CurrentMetrics::Write}; + res = sdk_loader->cfsWrite(id, fd, working_buffer.begin() + bytes_written, offset() - bytes_written, 0); + } + + if ((-1 == res || 0 == res) && errno != EINTR) + { + //ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteFailed); + + /// Don't use getFileName() here because this method can be called from destructor + String error_file_name = file_name; + if (error_file_name.empty()) + error_file_name = "(fd = " + toString(fd) + ")"; + throwFromErrnoWithPath("Cannot write to file " + error_file_name, error_file_name, ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR); + } + + if (res > 0) + bytes_written += res; + } + + //ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds()); + //ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written); +} + +void WriteBufferFromCubeFS::sync() +{ + /// If buffer has pending data - write it. + next(); + + int res = sdk_loader->cfsFlush(id, fd); + if (-1 == res) + throwFromErrnoWithPath("Cannot flush " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSYNC); +} + +off_t WriteBufferFromCubeFS::size() +{ + struct cfs_stat_info file_info; + int result = sdk_loader->cfsGetattr(id, const_cast(file_name.data()), &file_info); + if (result != 0) + { + // Handle the error (throw an exception, return an error code, etc.) + throwFromErrnoWithPath("Cannot execute fstat " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSTAT); + } + return file_info.size; +} +} diff --git a/src/IO/WriteBufferFromCubeFS.h b/src/IO/WriteBufferFromCubeFS.h new file mode 100644 index 000000000000..efbb283d250a --- /dev/null +++ b/src/IO/WriteBufferFromCubeFS.h @@ -0,0 +1,38 @@ +#pragma once +#include +#include + +namespace Poco +{ +class Logger; +} +namespace DB +{ +class WriteBufferFromCubeFS : public WriteBufferFromFileBase +{ +public: + explicit WriteBufferFromCubeFS( + int64_t id, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + int flags = -1, + mode_t mode = 0666, + char * existing_memory = nullptr, + size_t alignment = 0); + ~WriteBufferFromCubeFS() override; + void close(); + void sync() override; + std::string getFileName() const override { return file_name; } + off_t size(); + Poco::Logger * log = &Poco::Logger::get("WriteBufferFromCubeFS"); + +private: + void finalizeImpl() override; + void nextImpl() override; + int64_t id; + std::string file_name; + std::shared_ptr sdk_loader; + int fd; +}; +} From cc8b76b0fd2a0fa8ad61b1d4ae2cdad7b3487723 Mon Sep 17 00:00:00 2001 From: xieyichen Date: Mon, 6 Nov 2023 16:15:46 +0800 Subject: [PATCH 4/4] remove static library --- contrib/cubefs-cmake/CMakeLists.txt | 7 -- programs/CMakeLists.txt | 25 ------- programs/config_tools.h.in | 1 - programs/main.cpp | 6 -- src/Disks/CubeFS/DiskCubeFS.cpp | 24 +----- src/Disks/CubeFS/DiskCubeFS.h | 1 + src/Disks/CubeFS/DiskCubeFSCheckThread.cpp | 70 ------------------ src/Disks/CubeFS/DiskCubeFSCheckThread.h | 39 ---------- src/Disks/CubeFS/registerDiskCubeFS.cpp | 86 +++++++++++----------- src/Disks/registerDisks.cpp | 2 - src/Disks/tests/gtest_disk_cubefs.cpp | 2 +- 11 files changed, 47 insertions(+), 216 deletions(-) delete mode 100644 src/Disks/CubeFS/DiskCubeFSCheckThread.cpp delete mode 100644 src/Disks/CubeFS/DiskCubeFSCheckThread.h diff --git a/contrib/cubefs-cmake/CMakeLists.txt b/contrib/cubefs-cmake/CMakeLists.txt index abec8b08c765..61a2bdc56b96 100644 --- a/contrib/cubefs-cmake/CMakeLists.txt +++ b/contrib/cubefs-cmake/CMakeLists.txt @@ -1,15 +1,8 @@ cmake_minimum_required(VERSION 3.20) -message(STATUS "start handle contrib/cubefs-cmake/CMakeLists.txt") set(CUBEFS_DIR "${ClickHouse_SOURCE_DIR}/contrib/cubefs") -message(STATUS "start execute_process") execute_process( COMMAND make libsdk WORKING_DIRECTORY ${CUBEFS_DIR}) -add_library(_cubefs STATIC IMPORTED GLOBAL) -set_target_properties(_cubefs PROPERTIES - IMPORTED_LOCATION "${CUBEFS_DIR}/build/bin/libcfs.a" -) -add_library(ch_contrib::cubefs ALIAS _cubefs) set(FILE_PATH "${CUBEFS_DIR}/libsdk/libcfs.h") # Add configuration to ignore warnings in the header file file(STRINGS ${FILE_PATH} FILE_CONTENTS) diff --git a/programs/CMakeLists.txt b/programs/CMakeLists.txt index 629e0c0fc8a0..cca7be97b61b 100644 --- a/programs/CMakeLists.txt +++ b/programs/CMakeLists.txt @@ -24,9 +24,6 @@ option (ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG "Configs processor (extract values # https://clickhouse.com/docs/en/operations/utilities/clickhouse-compressor/ option (ENABLE_CLICKHOUSE_COMPRESSOR "Data compressor and decompressor" ${ENABLE_CLICKHOUSE_ALL}) -# cubefs test -option (ENABLE_CLICKHOUSE_CUBEFS_TEST "Cubefs test" ${ENABLE_CLICKHOUSE_ALL}) - # https://clickhouse.com/docs/en/operations/utilities/clickhouse-copier/ option (ENABLE_CLICKHOUSE_COPIER "Inter-cluster data copying mode" ${ENABLE_CLICKHOUSE_ALL}) @@ -117,12 +114,6 @@ else() message(STATUS "Copier mode: OFF") endif() -if (ENABLE_CLICKHOUSE_CUBEFS_TEST) - message(STATUS "Cubefs test: ON") -else() - message(STATUS "Cubefs test: OFF") -endif() - if (ENABLE_CLICKHOUSE_FORMAT) message(STATUS "Format mode: ON") else() @@ -239,7 +230,6 @@ add_subdirectory (install) add_subdirectory (git-import) add_subdirectory (bash-completion) add_subdirectory (static-files-disk-uploader) -add_subdirectory (cubefs-test) if (ENABLE_CLICKHOUSE_KEEPER) add_subdirectory (keeper) @@ -271,7 +261,6 @@ if (CLICKHOUSE_ONE_SHARED) ${CLICKHOUSE_GIT_IMPORT_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES} ${CLICKHOUSE_KEEPER_SOURCES} - ${CLICKHOUSE_CUBEFS_TEST_SOURCES} ${CLICKHOUSE_KEEPER_CONVERTER_SOURCES} ${CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_SOURCES}) @@ -288,7 +277,6 @@ if (CLICKHOUSE_ONE_SHARED) ${CLICKHOUSE_GIT_IMPORT_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK} ${CLICKHOUSE_KEEPER_LINK} - ${CLICKHOUSE_CUBEFS_TEST_LINK} ${CLICKHOUSE_KEEPER_CONVERTER_LINK} ${CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_LINK}) @@ -305,7 +293,6 @@ if (CLICKHOUSE_ONE_SHARED) ${CLICKHOUSE_GIT_IMPORT_INCLUDE} ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE} ${CLICKHOUSE_KEEPER_INCLUDE} - ${CLICKHOUSE_CUBEFS_TEST_INCLUDE} ${CLICKHOUSE_KEEPER_CONVERTER_INCLUDE}) set_target_properties(clickhouse-lib PROPERTIES SOVERSION ${VERSION_MAJOR}.${VERSION_MINOR} VERSION ${VERSION_SO} OUTPUT_NAME clickhouse DEBUG_POSTFIX "") @@ -343,10 +330,6 @@ if (CLICKHOUSE_SPLIT_BINARY) list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-keeper-converter) endif () - if (ENABLE_CLICKHOUSE_CUBEFS_TEST) - list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-cubefs-test) - endif () - set_target_properties(${CLICKHOUSE_ALL_TARGETS} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..) add_custom_target (clickhouse-bundle ALL DEPENDS ${CLICKHOUSE_ALL_TARGETS}) @@ -373,9 +356,6 @@ else () if (ENABLE_CLICKHOUSE_LOCAL) clickhouse_target_link_split_lib(clickhouse local) endif () - if (ENABLE_CLICKHOUSE_CUBEFS_TEST) - clickhouse_target_link_split_lib(clickhouse cubefs-test) - endif () if (ENABLE_CLICKHOUSE_BENCHMARK) clickhouse_target_link_split_lib(clickhouse benchmark) endif () @@ -436,11 +416,6 @@ else () install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-copier" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) list(APPEND CLICKHOUSE_BUNDLE clickhouse-copier) endif () - if (ENABLE_CLICKHOUSE_CUBEFS_TEST) - add_custom_target (clickhouse-cubefs-test ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-cubefs-test DEPENDS clickhouse) - install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-cubefs-test" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) - list(APPEND CLICKHOUSE_BUNDLE clickhouse-cubefs-test) - endif () if (ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG) add_custom_target (clickhouse-extract-from-config ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-extract-from-config DEPENDS clickhouse) install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-extract-from-config" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) diff --git a/programs/config_tools.h.in b/programs/config_tools.h.in index 30f890045696..b97eb63b5352 100644 --- a/programs/config_tools.h.in +++ b/programs/config_tools.h.in @@ -19,4 +19,3 @@ #cmakedefine01 ENABLE_CLICKHOUSE_KEEPER #cmakedefine01 ENABLE_CLICKHOUSE_KEEPER_CONVERTER #cmakedefine01 ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER -#cmakedefine01 ENABLE_CLICKHOUSE_CUBEFS_TEST diff --git a/programs/main.cpp b/programs/main.cpp index f8573e13ce78..2cdda075ca7d 100644 --- a/programs/main.cpp +++ b/programs/main.cpp @@ -63,9 +63,6 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv); #if ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER int mainEntryClickHouseStaticFilesDiskUploader(int argc, char ** argv); #endif -#if ENABLE_CLICKHOUSE_CUBEFS_TEST -int mainEntryClickHouseCubefsTest(int argc, char ** argv); -#endif #if ENABLE_CLICKHOUSE_INSTALL int mainEntryClickHouseInstall(int argc, char ** argv); int mainEntryClickHouseStart(int argc, char ** argv); @@ -130,9 +127,6 @@ std::pair clickhouse_applications[] = #if ENABLE_CLICKHOUSE_KEEPER_CONVERTER {"keeper-converter", mainEntryClickHouseKeeperConverter}, #endif -#if ENABLE_CLICKHOUSE_CUBEFS_TEST - {"cubefs-test", mainEntryClickHouseCubefsTest}, -#endif #if ENABLE_CLICKHOUSE_INSTALL {"install", mainEntryClickHouseInstall}, {"start", mainEntryClickHouseStart}, diff --git a/src/Disks/CubeFS/DiskCubeFS.cpp b/src/Disks/CubeFS/DiskCubeFS.cpp index 71555fc9408f..7e09b29e1b1a 100644 --- a/src/Disks/CubeFS/DiskCubeFS.cpp +++ b/src/Disks/CubeFS/DiskCubeFS.cpp @@ -528,7 +528,7 @@ void DiskCubeFS::createHardLink(const String &, const String &) } DiskCubeFS::DiskCubeFS(const String & name_, SettingsPtr settings_, UInt64 keep_free_space_bytes_) - : name(name_), settings(std::move(settings_)),keep_free_space_bytes(keep_free_space_bytes_), logger(&Poco::Logger::get("DiskCubeFS")) + : name(name_), settings(std::move(settings_)), logger(&Poco::Logger::get("DiskCubeFS")),keep_free_space_bytes(keep_free_space_bytes_) { sdk_loader = std::make_shared(settings->lib_path); id = sdk_loader->cfsNewClient(); @@ -550,8 +550,8 @@ DiskCubeFS::DiskCubeFS(const String & name_, SettingsPtr settings_, UInt64 keep_ createDirectories(""); } -DiskCubeFS::DiskCubeFS(const String & name_, ContextPtr_, SettingsPtr settings_, UInt64 keep_free_space_bytes_) -: DiskCubeFS(name_, settings_, keep_free_space_bytes_) +DiskCubeFS::DiskCubeFS(const String & name_, ContextPtr, SettingsPtr settings_, UInt64 keep_free_space_bytes_) + : DiskCubeFS(name_, settings_, keep_free_space_bytes_) { } @@ -606,24 +606,6 @@ bool DiskCubeFS::canRead(const std::string & path) return (stat.mode & S_IROTH) != 0 || geteuid() == 0; } -bool DiskCubeFS::canWrite() const noexcept -try -{ - static DiskWriteCheckData data; - String tmp_template = fs::path(disk_path) / ""; - { - auto buf = WriteBufferFromTemporaryFile::create(tmp_template); - buf->write(data.data, data.PAGE_SIZE_IN_BYTES); - buf->sync(); - } - return true; -} -catch (...) -{ - LOG_WARNING(logger, "Cannot achieve write over the disk directory: {}", disk_path); - return false; -} - bool DiskCubeFS::exists(const String & path) const { fs::path full_path = fs::path(settings->disk_path) / path; diff --git a/src/Disks/CubeFS/DiskCubeFS.h b/src/Disks/CubeFS/DiskCubeFS.h index 8e66a8ccc42c..d1add66048c9 100644 --- a/src/Disks/CubeFS/DiskCubeFS.h +++ b/src/Disks/CubeFS/DiskCubeFS.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include "sdkLoader.h" namespace DB diff --git a/src/Disks/CubeFS/DiskCubeFSCheckThread.cpp b/src/Disks/CubeFS/DiskCubeFSCheckThread.cpp deleted file mode 100644 index 7e1cb683c313..000000000000 --- a/src/Disks/CubeFS/DiskCubeFSCheckThread.cpp +++ /dev/null @@ -1,70 +0,0 @@ -#include - -#include -#include -#include - -namespace DB -{ -static const auto DISK_CHECK_ERROR_SLEEP_MS = 1000; -static const auto DISK_CHECK_ERROR_RETRY_TIME = 3; - -DiskCubeFSCheckThread::DiskCubeFSCheckThread(DiskCubeFS * disk_, ContextPtr context_, UInt64 local_disk_check_period_ms) - : WithContext(context_) - , disk(std::move(disk_)) - , check_period_ms(local_disk_check_period_ms) - , log(&Poco::Logger::get(fmt::format("DiskCubeFSCheckThread({})", disk->getName()))) -{ - task = getContext()->getSchedulePool().createTask(log->name(), [this] { run(); }); -} - -void DiskCubeFSCheckThread::startup() -{ - need_stop = false; - retry = 0; - task->activateAndSchedule(); -} - -void DiskCubeFSCheckThread::run() -{ - if (need_stop) - return; - - bool can_read = disk->canRead(); - bool can_write = disk->canWrite(); - if (can_read) - { - if (disk->broken) - LOG_INFO(log, "Disk {0} seems to be fine. It can be recovered using `SYSTEM RESTART DISK {0}`", disk->getName()); - retry = 0; - if (can_write) - disk->readonly = false; - else - { - disk->readonly = true; - LOG_INFO(log, "Disk {} is readonly", disk->getName()); - } - task->scheduleAfter(check_period_ms); - } - else if (!disk->broken && retry < DISK_CHECK_ERROR_RETRY_TIME) - { - ++retry; - task->scheduleAfter(DISK_CHECK_ERROR_SLEEP_MS); - } - else - { - retry = 0; - disk->broken = true; - LOG_INFO(log, "Disk {} is broken", disk->getName()); - task->scheduleAfter(check_period_ms); - } -} - -void DiskCubeFSCheckThread::shutdown() -{ - need_stop = true; - task->deactivate(); - LOG_TRACE(log, "DiskCubeFSCheck thread finished"); -} - -} diff --git a/src/Disks/CubeFS/DiskCubeFSCheckThread.h b/src/Disks/CubeFS/DiskCubeFSCheckThread.h deleted file mode 100644 index ca471558a1de..000000000000 --- a/src/Disks/CubeFS/DiskCubeFSCheckThread.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include -#include - -namespace Poco -{ -class Logger; -} - -namespace DB -{ -class DiskCubeFS; - -class DiskCubeFSCheckThread : WithContext -{ -public: - friend class DiskCubeFS; - - DiskCubeFSCheckThread(DiskCubeFS * disk_, ContextPtr context_, UInt64 local_disk_check_period_ms); - - void startup(); - - void shutdown(); - -private: - bool check(); - void run(); - - DiskCubeFS * disk; - size_t check_period_ms; - Poco::Logger * log; - std::atomic need_stop{false}; - - BackgroundSchedulePool::TaskHolder task; - size_t retry{}; -}; - -} diff --git a/src/Disks/CubeFS/registerDiskCubeFS.cpp b/src/Disks/CubeFS/registerDiskCubeFS.cpp index af673b566d41..77286c4093ca 100644 --- a/src/Disks/CubeFS/registerDiskCubeFS.cpp +++ b/src/Disks/CubeFS/registerDiskCubeFS.cpp @@ -10,54 +10,52 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } - DiskCubeFS::SettingsPtr getSettings(const String & name,ContextPtr context,const Poco::Util::AbstractConfiguration & config, const String & config_prefix) - { - String disk_path=config.getString(config_prefix + ".disk_path"); +DiskCubeFS::SettingsPtr +getSettings(const String & name, ContextPtr context, const Poco::Util::AbstractConfiguration & config, const String & config_prefix) +{ + String disk_path = config.getString(config_prefix + ".disk_path"); if (disk_path.empty()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path can not be empty. Disk {}", name); - if (disk_path.back() != '/') - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path must end with /. Disk {}", name); - if (disk_path == context->getPath()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path ('{}') cannot be equal to . Use disk instead.", path); - LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "volumn name: {}", config.getString(config_prefix + ".vol_name")); - LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "master address: {}", config.getString(config_prefix + ".master_addr")); - LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "log directory: {}", config.getString(config_prefix + ".log_dir")); - LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "log level: {}", config.getString(config_prefix + ".log_level")); - LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "access key: {}", config.getString(config_prefix + ".access_key")); - LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "secret key: {}", config.getString(config_prefix + ".secret_key")); - LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "push address: {}", config.getString(config_prefix + ".push_addr")); - LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "disk path: {}", disk_path); - LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "lib path: {}", config.getString(config_prefix + ".lib_path")); - return std::make_shared( - config.getString(config_prefix + ".vol_name"), - config.getString(config_prefix + ".master_addr"), - config.getString(config_prefix + ".log_dir"), - config.getString(config_prefix + ".log_level", "debug"), - config.getString(config_prefix + ".access_key"), - config.getString(config_prefix + ".secret_key"), - config.getString(config_prefix + ".push_addr"), - disk_path, - config.getString(config_prefix + ".lib_path")); - } + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path can not be empty. Disk {}", name); + if (disk_path.back() != '/') + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path must end with /. Disk {}", name); + if (disk_path == context->getPath()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path ('{}') cannot be equal to . Use disk instead.", disk_path); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "volumn name: {}", config.getString(config_prefix + ".vol_name")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "master address: {}", config.getString(config_prefix + ".master_addr")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "log directory: {}", config.getString(config_prefix + ".log_dir")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "log level: {}", config.getString(config_prefix + ".log_level")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "access key: {}", config.getString(config_prefix + ".access_key")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "secret key: {}", config.getString(config_prefix + ".secret_key")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "push address: {}", config.getString(config_prefix + ".push_addr")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "disk path: {}", disk_path); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "lib path: {}", config.getString(config_prefix + ".lib_path")); + return std::make_shared( + config.getString(config_prefix + ".vol_name"), + config.getString(config_prefix + ".master_addr"), + config.getString(config_prefix + ".log_dir"), + config.getString(config_prefix + ".log_level", "debug"), + config.getString(config_prefix + ".access_key"), + config.getString(config_prefix + ".secret_key"), + config.getString(config_prefix + ".push_addr"), + disk_path, + config.getString(config_prefix + ".lib_path")); +} void registerDiskCubeFS(DiskFactory & factory) { - auto creator = [](const String & name, - const Poco::Util::AbstractConfiguration & config, - const String & config_prefix, - ContextPtr context, - const DisksMap & /*map*/) -> DiskPtr - { - auto settings = getSettings(name,context,config, config_prefix); - UInt64 keep_free_space_bytes; - loadCubeFSChangeableConfig( config, - config_prefix, - keep_free_space_bytes) - DiskPtr cubefsDisk = std::make_shared(name, context, settings,keep_free_space_bytes); - cubefsDisk->startup(); - return cubefsDisk; - }; - factory.registerDiskType("cubefs", creator); + auto creator = [](const String & name, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + ContextPtr context, + const DisksMap & /*map*/) -> DiskPtr + { + auto settings = getSettings(name, context, config, config_prefix); + UInt64 keep_free_space_bytes; + loadCubeFSChangeableConfig(config, config_prefix, keep_free_space_bytes); + DiskPtr cubefsDisk = std::make_shared(name, context, settings, keep_free_space_bytes); + return cubefsDisk; + }; + factory.registerDiskType("cubefs", creator); } } diff --git a/src/Disks/registerDisks.cpp b/src/Disks/registerDisks.cpp index 1588ffd5f51c..11b6cdfae64e 100644 --- a/src/Disks/registerDisks.cpp +++ b/src/Disks/registerDisks.cpp @@ -9,7 +9,6 @@ namespace DB void registerDiskLocal(DiskFactory & factory); void registerDiskMemory(DiskFactory & factory); -void registerDiskCubeFS(DiskFactory & factory); #if USE_AWS_S3 void registerDiskS3(DiskFactory & factory); #endif @@ -37,7 +36,6 @@ void registerDisks() registerDiskLocal(factory); registerDiskMemory(factory); - registerDiskCubeFS(factory); #if USE_AWS_S3 registerDiskS3(factory); diff --git a/src/Disks/tests/gtest_disk_cubefs.cpp b/src/Disks/tests/gtest_disk_cubefs.cpp index 7b5dfb0e1481..cb603304c941 100644 --- a/src/Disks/tests/gtest_disk_cubefs.cpp +++ b/src/Disks/tests/gtest_disk_cubefs.cpp @@ -34,7 +34,7 @@ class DiskTestCubeFS : public testing::Test auto settings = std::make_shared( vol_name, master_addr, log_dir, log_level, access_key, secret_key, push_addr, path, lib_path); std::cout << "set client info successfully " << std::endl; - disk = std::make_shared(disk_name, settings); + disk = std::make_shared(disk_name, settings, 0); std::cout << "initial cubefs disk successfully " << std::endl; } void TearDown() override