diff --git a/.gitmodules b/.gitmodules index 6c9e66f9cbc7..4ad531684245 100644 --- a/.gitmodules +++ b/.gitmodules @@ -262,3 +262,7 @@ [submodule "contrib/minizip-ng"] path = contrib/minizip-ng url = https://github.com/zlib-ng/minizip-ng +[submodule "contrib/cubefs"] + path = contrib/cubefs + url = https://github.com/RunningXie/cubefs.git + branch = fix_sdk_compilation diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 9cf307c473ee..e1e18fcd6e1b 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -135,6 +135,7 @@ add_contrib (libpq-cmake libpq) add_contrib (nuraft-cmake NuRaft) add_contrib (fast_float-cmake fast_float) add_contrib (datasketches-cpp-cmake datasketches-cpp) +add_contrib (cubefs-cmake cubefs) option(ENABLE_NLP "Enable NLP functions support" ${ENABLE_LIBRARIES}) if (ENABLE_NLP) diff --git a/contrib/cubefs b/contrib/cubefs new file mode 160000 index 000000000000..5eb497912217 --- /dev/null +++ b/contrib/cubefs @@ -0,0 +1 @@ +Subproject commit 5eb4979122170f02e2c2fc4e14f46f6092c4e08e diff --git a/contrib/cubefs-cmake/CMakeLists.txt b/contrib/cubefs-cmake/CMakeLists.txt new file mode 100644 index 000000000000..61a2bdc56b96 --- /dev/null +++ b/contrib/cubefs-cmake/CMakeLists.txt @@ -0,0 +1,23 @@ +cmake_minimum_required(VERSION 3.20) +set(CUBEFS_DIR "${ClickHouse_SOURCE_DIR}/contrib/cubefs") +execute_process( + COMMAND make libsdk + WORKING_DIRECTORY ${CUBEFS_DIR}) +set(FILE_PATH "${CUBEFS_DIR}/libsdk/libcfs.h") +# Add configuration to ignore warnings in the header file +file(STRINGS ${FILE_PATH} FILE_CONTENTS) +list(FIND FILE_CONTENTS "#pragma GCC diagnostic push" PRAGMA_INDEX) +if (PRAGMA_INDEX EQUAL -1) + execute_process( + COMMAND sh -c "echo '#pragma GCC diagnostic push\n#pragma GCC diagnostic ignored \"-Wreserved-identifier\"\n#pragma GCC diagnostic ignored \"-Wmacro-redefined\"\n' | cat - ${FILE_PATH}" + OUTPUT_FILE "${FILE_PATH}.tmp" + ) + execute_process( + COMMAND sh -c "echo '#pragma GCC diagnostic pop' >> ${FILE_PATH}.tmp" + ) + execute_process( + COMMAND mv "${FILE_PATH}.tmp" "${FILE_PATH}" + ) +endif() + + diff --git a/programs/cubefs-test/CMakeLists.txt b/programs/cubefs-test/CMakeLists.txt new file mode 100644 index 000000000000..54f1628419a6 --- /dev/null +++ b/programs/cubefs-test/CMakeLists.txt @@ -0,0 +1,4 @@ +set (CLICKHOUSE_CUBEFS_TEST_SOURCES cubefs-test.cpp clickhouse-cubefs-test.cpp) +set (CLICKHOUSE_CUBEFS_TEST_LINK dbms ch_contrib::cubefs) +set (CLICKHOUSE_CUBEFS_TEST_INCLUDE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) +clickhouse_program_add(cubefs-test) diff --git a/programs/cubefs-test/clickhouse-cubefs-test.cpp b/programs/cubefs-test/clickhouse-cubefs-test.cpp new file mode 100644 index 000000000000..384aea9916dc --- /dev/null +++ b/programs/cubefs-test/clickhouse-cubefs-test.cpp @@ -0,0 +1,5 @@ +int mainEntryClickHouseCubefsTest(int argc, char ** argv); +int main(int argc_, char ** argv_) +{ + return mainEntryClickHouseCubefsTest(argc_, argv_); +} diff --git a/programs/cubefs-test/cubefs-dlopen.cpp b/programs/cubefs-test/cubefs-dlopen.cpp new file mode 100644 index 000000000000..996106eff82b --- /dev/null +++ b/programs/cubefs-test/cubefs-dlopen.cpp @@ -0,0 +1,165 @@ +#include +#include +#include +#include + + +int main() +{ + void * handle = dlopen("./libcfs.so", RTLD_LAZY); + if (!handle) + { + std::cerr << "Failed to open the dynamic library: " << dlerror() << std::endl; + return 1; + } + typedef int64_t (*CfsNewClientFunction)(); + typedef int (*CfsSetClientFunction)(int64_t, char *, char *); + typedef int (*CfsStartClientFunction)(int64_t); + typedef void (*CfsCloseClientFunction)(int64_t); + typedef int (*CfsChdirFunction)(int64_t, char *); + typedef char * (*CfsGetcwdFunction)(int64_t); + typedef int (*CfsGetattrFunction)(int64_t, char *, struct cfs_stat_info *); + typedef int (*CfsSetattrFunction)(int64_t, char *, struct cfs_stat_info *, int); + typedef int (*CfsOpenFunction)(int64_t, char *, int, mode_t); + typedef int (*CfsFlushFunction)(int64_t, int); + typedef void (*CfsCloseFunction)(int64_t, int); + typedef ssize_t (*CfsWriteFunction)(int64_t, int, void *, size_t, off_t); + typedef ssize_t (*CfsReadFunction)(int64_t, int, void *, size_t, off_t); + typedef int (*CfsBatchGetInodesFunction)(int64_t, int, void *, GoSlice, int); + typedef int (*CfsRefreshSummaryFunction)(int64_t, char *, int); + typedef int (*CfsReaddirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsLsdirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsMkdirsFunction)(int64_t, char *, mode_t); + typedef int (*CfsRmdirFunction)(int64_t, char *); + typedef int (*CfsUnlinkFunction)(int64_t, char *); + typedef int (*CfsRenameFunction)(int64_t, char *, char *); + typedef int (*CfsFchmodFunction)(int64_t, int, mode_t); + typedef int (*CfsGetsummaryFunction)(int64_t, char *, struct cfs_summary_info *, char *, int); + CfsNewClientFunction cfsNewClient = (CfsNewClientFunction)dlsym(handle, "cfs_new_client"); + CfsSetClientFunction cfsSetClient = (CfsSetClientFunction)dlsym(handle, "cfs_set_client"); + CfsStartClientFunction cfsStartClient = (CfsStartClientFunction)dlsym(handle, "cfs_start_client"); + CfsCloseClientFunction cfsCloseClient = (CfsCloseClientFunction)dlsym(handle, "cfs_close_client"); + CfsChdirFunction cfsChdir = (CfsChdirFunction)dlsym(handle, "cfs_chdir"); + CfsGetcwdFunction cfsGetcwd = (CfsGetcwdFunction)dlsym(handle, "cfs_getcwd"); + CfsGetattrFunction cfsGetattr = (CfsGetattrFunction)dlsym(handle, "cfs_getattr"); + CfsSetattrFunction cfsSetattr = (CfsSetattrFunction)dlsym(handle, "cfs_setattr"); + CfsOpenFunction cfsOpen = (CfsOpenFunction)dlsym(handle, "cfs_open"); + CfsFlushFunction cfsFlush = (CfsFlushFunction)dlsym(handle, "cfs_flush"); + CfsCloseFunction cfsClose = (CfsCloseFunction)dlsym(handle, "cfs_close"); + CfsLsdirFunction cfsLsdir = (CfsLsdirFunction)dlsym(handle, "cfs_lsdir"); + CfsMkdirsFunction cfsMkdirs = (CfsMkdirsFunction)dlsym(handle, "cfs_mkdirs"); + CfsRmdirFunction cfsRmdir = (CfsRmdirFunction)dlsym(handle, "cfs_rmdir"); + CfsUnlinkFunction cfsUnlink = (CfsUnlinkFunction)dlsym(handle, "cfs_unlink"); + CfsRenameFunction cfsRename = (CfsRenameFunction)dlsym(handle, "cfs_rename"); + CfsFchmodFunction cfsFchmod = (CfsFchmodFunction)dlsym(handle, "cfs_fchmod"); + CfsGetsummaryFunction cfsGetsummary = (CfsGetsummaryFunction)dlsym(handle, "cfs_getsummary"); + CfsWriteFunction cfsWrite = (CfsWriteFunction)dlsym(handle, "cfs_write"); + CfsReadFunction cfsRead = (CfsReadFunction)dlsym(handle, "cfs_read"); + if (!cfsNewClient || !cfsSetClient || !cfsStartClient || !cfsCloseClient || !cfsChdir || !cfsGetcwd || !cfsGetattr || !cfsSetattr + || !cfsOpen || !cfsFlush || !cfsClose || !cfsWrite || !cfsRead || !cfsLsdir || !cfsMkdirs || !cfsRmdir || !cfsUnlink || !cfsRename + || !cfsFchmod || !cfsGetsummary) + { + std::cerr << "Failed to find one or more functions: " << dlerror() << std::endl; + dlclose(handle); + return 1; + } + int64_t clientId = cfsNewClient(); + if (clientId == 0) + { + std::cerr << "Failed to create client" << std::endl; + dlclose(handle); + return 1; + } + int result = cfsSetClient(clientId, strdup("volName"), strdup("xieyichen")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("masterAddr"), strdup("cfs-south.oppo.local")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("logDir"), strdup("/home/service/var/logs/cfs/test-log")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("logLevel"), strdup("debug")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("accessKey"), strdup("jRlZO65q7XlH5bnV")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("secretKey"), strdup("V1m730UzREHaK1jCkC0kL0cewOX0kH3K")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("pushAddr"), strdup("cfs.dg-push.wanyol.com")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsStartClient(clientId); + if (result != 0) + { + std::cerr << "Failed to start client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + int fd = cfsOpen(clientId, strdup("/test_dir/file.txt"), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (fd < 0) + { + printf("Failed to open file\n"); + cfsCloseClient(clientId); + return -1; + } + const char * data = "xieyichen"; + if (cfsWrite(clientId, fd, static_cast(const_cast(data)), strlen(data), 0) < 0) + { + printf("Failed to write file\n"); + cfsClose(clientId, fd); + cfsCloseClient(clientId); + return -1; + } + char buffer[1024]; + memset(buffer, 0, sizeof(buffer)); + if (cfsRead(clientId, fd, static_cast(const_cast(buffer)), sizeof(buffer), 0) < 0) + { + printf("Failed to read file\n"); + cfsClose(clientId, fd); + cfsCloseClient(clientId); + return -1; + } + printf("Read file content: %s\n", buffer); + cfsCloseClient(clientId); + dlclose(handle); + return 0; +} diff --git a/programs/cubefs-test/cubefs-test.cpp b/programs/cubefs-test/cubefs-test.cpp new file mode 100644 index 000000000000..a1c54ed1baa2 --- /dev/null +++ b/programs/cubefs-test/cubefs-test.cpp @@ -0,0 +1,103 @@ +#include +#include +#include +#include +#include +#include + +void readWriteTest() +{ + int64_t id = cfs_new_client(); + if (id <= 0) + { + printf("Failed to create client\n"); + return; + } + if (cfs_set_client(id, strdup("volName"), strdup("xieyichen")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("masterAddr"), strdup("cfs-south.oppo.local")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("logDir"), strdup("/home/service/var/logs/cfs/test-log")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("logLevel"), strdup("debug")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("accessKey"), strdup("jRlZO65q7XlH5bnV")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("secretKey"), strdup("V1m730UzREHaK1jCkC0kL0cewOX0kH3K")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("pushAddr"), strdup("cfs.dg-push.wanyol.com")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_start_client(id) != 0) + { + printf("Failed to start client \n"); + cfs_close_client(id); + return; + } + int fd = cfs_open(id, strdup("/test_dir/file.txt"), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (fd < 0) + { + printf("Failed to open file\n"); + cfs_close_client(id); + return; + } + + const char * data = "Hello, world!"; + if (cfs_write(id, fd, static_cast(const_cast(data)), strlen(data), 0) < 0) + { + printf("Failed to write file\n"); + cfs_close(id, fd); + cfs_close_client(id); + return; + } + + char buffer[1024]; + memset(buffer, 0, sizeof(buffer)); + if (cfs_read(id, fd, static_cast(const_cast(buffer)), sizeof(buffer), 0) < 0) + { + printf("Failed to read file\n"); + cfs_close(id, fd); + cfs_close_client(id); + return; + } + + printf("Read file content: %s\n", buffer); + cfs_close(id, fd); + cfs_close_client(id); +} + +int mainEntryClickHouseCubefsTest(int argc, char ** argv) +{ + std::thread worker(readWriteTest); + worker.join(); + std::cout << "argc = " << argc << std::endl; + std::cout << "argv = " << argv << std::endl; + return 0; +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 77fe4376fc80..73bebd9a0804 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -75,12 +75,13 @@ add_subdirectory (Coordination) set(dbms_headers) set(dbms_sources) - +message("ClickHouse_SOURCE_DIR: ${ClickHouse_SOURCE_DIR}") add_headers_and_sources(clickhouse_common_io Common) add_headers_and_sources(clickhouse_common_io Common/HashTable) add_headers_and_sources(clickhouse_common_io IO) add_headers_and_sources(clickhouse_common_io IO/Archives) add_headers_and_sources(clickhouse_common_io IO/S3) +add_headers_and_sources(clickhouse_common_io ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp) add_headers_and_sources(dbms Disks/IO) @@ -113,6 +114,11 @@ if (TARGET ch_contrib::aws_s3) add_headers_and_sources(dbms Disks/S3) endif() +if (TARGET ch_contrib::cubefs) + add_headers_and_sources(dbms Disks/CubeFS) + add_headers_and_sources(dbms ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) +endif() + if (TARGET ch_contrib::azure_sdk) add_headers_and_sources(dbms Disks/AzureBlobStorage) endif() @@ -306,6 +312,7 @@ endmacro () dbms_target_include_directories (PUBLIC "${ClickHouse_SOURCE_DIR}/src" "${ClickHouse_BINARY_DIR}/src") target_include_directories (clickhouse_common_io PUBLIC "${ClickHouse_SOURCE_DIR}/src" "${ClickHouse_BINARY_DIR}/src") +target_include_directories(clickhouse_common_io PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) if (TARGET ch_contrib::llvm) dbms_target_link_libraries (PUBLIC ch_contrib::llvm) @@ -466,6 +473,10 @@ if (TARGET ch_contrib::aws_s3) target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::aws_s3) endif() +if (TARGET ch_contrib::cubefs) + target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::cubefs) +endif() + if (TARGET ch_contrib::azure_sdk) target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::azure_sdk) endif() diff --git a/src/Core/Settings.h b/src/Core/Settings.h index ca6839494f2e..6c731ba61dad 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -34,6 +34,7 @@ class IColumn; */ #define COMMON_SETTINGS(M) \ + M(String, handle_remove_error_path, "", "When remove_all throws the directory not empty exception, the undeleted data will be moved to the the setting directory. Empty means that the exception will not be processed.", 0) \ M(UInt64, min_compress_block_size, 65536, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.", 0) \ M(UInt64, max_compress_block_size, 1048576, "The maximum size of blocks of uncompressed data before compressing for writing to a table.", 0) \ M(UInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading", 0) \ diff --git a/src/Disks/CubeFS/DiskCubeFS.cpp b/src/Disks/CubeFS/DiskCubeFS.cpp index 3ec501661a12..7e09b29e1b1a 100644 --- a/src/Disks/CubeFS/DiskCubeFS.cpp +++ b/src/Disks/CubeFS/DiskCubeFS.cpp @@ -1,26 +1,13 @@ -#include -#include -#include -#include -#include - -#include -#include +#include "DiskCubeFS.h" +#include +#include +#include +#include #include -#include -#include - -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include +namespace CurrentMetrics +{ +extern const Metric DiskSpaceReservedForMerge; +} namespace DB { @@ -35,117 +22,679 @@ namespace ErrorCodes extern const int CANNOT_UNLINK; extern const int CANNOT_RMDIR; extern const int BAD_ARGUMENTS; - extern const int CANNOT_STAT; + extern const int CANNOT_CREATE_DIRECTORY; + extern const int CANNOT_STATVFS; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; + extern const int CANNOT_CLOSE_FILE; + extern const int ATOMIC_RENAME_FAIL; + extern const int FILE_ALREADY_EXISTS; + extern const int DIRECTORY_DOESNT_EXIST; + extern const int FILE_DOESNT_EXIST; } -void loadDiskCubeFSConfig(const String & name, - const Poco::Util::AbstractConfiguration & config, - const String & config_prefix, - ContextPtr context, - String & path, - UInt64 & keep_free_space_bytes) +const int default_readdir_count = 10000; +std::mutex DiskCubeFS::reservation_mutex; +using DiskCubeFSPtr = std::shared_ptr; + +class DiskCubeFSReservation : public IReservation { - path = config.getString(config_prefix + ".path", ""); - if (name == "default") +public: + DiskCubeFSReservation(const DiskCubeFSPtr & disk_, UInt64 size_) + : disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_) { - if (!path.empty()) - throw Exception( - "\"default\" disk path should be provided in not it ", - ErrorCodes::BAD_ARGUMENTS); - path = context->getPath(); } - else + + UInt64 getSize() const override { return size; } + + DiskPtr getDisk(size_t i) const override + { + if (i != 0) + throw Exception("Can't use i != 0 with single disk reservation. It's a bug", ErrorCodes::LOGICAL_ERROR); + return disk; + } + + Disks getDisks() const override { return {disk}; } + + void update(UInt64 new_size) override + { + std::lock_guard lock(DiskCubeFS::reservation_mutex); + disk->reserved_bytes -= size; + size = new_size; + disk->reserved_bytes += size; + } + + ~DiskCubeFSReservation() override + { + try + { + std::lock_guard lock(DiskCubeFS::reservation_mutex); + if (disk->reserved_bytes < size) + { + disk->reserved_bytes = 0; + LOG_ERROR(&Poco::Logger::get("DiskCubeFS"), "Unbalanced reservations size for disk '{}'.", disk->getName()); + } + else + { + disk->reserved_bytes -= size; + } + + if (disk->reservation_count == 0) + LOG_ERROR(&Poco::Logger::get("DiskCubeFS"), "Unbalanced reservation count for disk '{}'.", disk->getName()); + else + --disk->reservation_count; + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + +private: + DiskCubeFSPtr disk; + UInt64 size; + CurrentMetrics::Increment metric_increment; +}; + + +ReservationPtr DiskCubeFS::reserve(UInt64 bytes) +{ + LOG_DEBUG(logger, "Got reserve request, bytes: {}", bytes); + if (!tryReserve(bytes)) + return {}; + return std::make_unique(std::static_pointer_cast(shared_from_this()), bytes); +} + +bool DiskCubeFS::tryReserve(UInt64 bytes) +{ + std::lock_guard lock(DiskCubeFS::reservation_mutex); + if (bytes == 0) + { + LOG_DEBUG(logger, "Reserving 0 bytes on disk {}", backQuote(name)); + ++reservation_count; + return true; + } + + auto available_space = getTotalSpace(); + UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes); + if (unreserved_space >= bytes) + { + LOG_DEBUG( + logger, "Reserving {} on disk {}, having unreserved {}.", ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space)); + ++reservation_count; + reserved_bytes += bytes; + return true; + } + LOG_DEBUG(logger, "DiskCubeFS::tryReserve return false"); + return false; +} + +UInt64 DiskCubeFS::getTotalSpace() const +{ + LOG_DEBUG(logger, "Use mock getTotalSpace function"); + return 419585155072; + //throwFromErrnoWithPath("DiskCubeFs does not support function getTotalSpace!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +UInt64 DiskCubeFS::getAvailableSpace() const +{ + LOG_DEBUG(logger, "Use mock getAvailableSpace function"); + return 151697231872; + //throwFromErrnoWithPath("DiskCubeFs does not support function getAvailableSpace!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +UInt64 DiskCubeFS::getUnreservedSpace() const +{ + LOG_DEBUG(logger, "Use mock getUnreservedSpace function"); + return 151697231872; + //throwFromErrnoWithPath("DiskCubeFs does not support function getUnreservedSpace!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +bool DiskCubeFS::isFile(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + if (S_ISREG(stat.mode)) + return true; + + return false; +} + +bool DiskCubeFS::isDirectory(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + if (S_ISDIR(stat.mode)) + return true; + + return false; +} + +size_t DiskCubeFS::getFileSize(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + return stat.size; +} + +cfs_stat_info DiskCubeFS::getFileAttributes(const String & relative_path) const +{ + cfs_stat_info stat; + fs::path full_path = fs::path(settings->disk_path) / relative_path; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.c_str()), &stat); + if (result != 0) + { + int error_code = errno; + std::string error_message = strerror(error_code); + std::string full_error_message = "Failed to get file attribute: " + full_path.string() + ": " + error_message; + throwFromErrnoWithPath(full_error_message, full_path, ErrorCodes::CANNOT_STATVFS); + } + return stat; +} + +fs::path DiskCubeFS::getParentPath(const String & path) +{ + fs::path filePath(path); + fs::path parentPath = filePath.parent_path(); + + if (path.back() == '/' && parentPath != filePath) + { + parentPath = parentPath.parent_path(); + } + return parentPath; +} + +void DiskCubeFS::createDirectory(const String & path) +{ + fs::path parent_dir = getParentPath(path); + if (!exists(parent_dir)) + { + throwFromErrnoWithPath( + "Parent directory does not exist [" + parent_dir.string() + "]", parent_dir, ErrorCodes::DIRECTORY_DOESNT_EXIST); + } + fs::path full_path = fs::path(settings->disk_path) / path; + createDirectories(path); +} + +void DiskCubeFS::createDirectories(const String & path) +{ + fs::path full_path = fs::path(settings->disk_path) / path; + int result = sdk_loader->cfsMkdirs(id, const_cast(full_path.string().c_str()), O_RDWR | O_CREAT); + if (result != 0) { - if (path.empty()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path can not be empty. Disk {}", name); - if (path.back() != '/') - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path must end with /. Disk {}", name); - if (path == context->getPath()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path ('{}') cannot be equal to . Use disk instead.", path); + throwFromErrnoWithPath("Cannot mkdir: " + full_path.string(), full_path, ErrorCodes::CANNOT_CREATE_DIRECTORY); } +} + +void DiskCubeFS::clearDirectory(const String & path) +{ + std::vector file_names; + listFiles(path, file_names); + for (const auto & filename : file_names) + { + fs::path file_path = fs::path(settings->disk_path) / filename; - bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio"); + int result = sdk_loader->cfsUnlink(id, const_cast(file_path.string().c_str())); + if (result < 0) + { + throwFromErrnoWithPath("Cannot unlink file: " + file_path.string(), file_path, ErrorCodes::CANNOT_UNLINK); + } + } +} + +void DiskCubeFS::listFiles(const String & path, std::vector & file_names) +{ + file_names.clear(); + fs::path full_path = fs::path(settings->disk_path) / path; + int fd = -1; + fd = sdk_loader->cfsOpen(id, const_cast(full_path.string().c_str()), 0, 0); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + full_path.string(), full_path, ErrorCodes::CANNOT_OPEN_FILE); + } + int count = default_readdir_count; + while (true) + { + std::vector direntsInfo(count); + std::memset(direntsInfo.data(), 0, count * sizeof(cfs_dirent_info)); + int num_entries = sdk_loader->cfsReaddir(id, fd, {direntsInfo.data(), count, count}, count); + if (num_entries < 0) + { + throwFromErrnoWithPath("Cannot readdir: " + full_path.string(), full_path, ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + if (num_entries < count) + { + for (int i = 0; i < num_entries; i++) + { + std::string file_name(direntsInfo[i].name); + file_names.emplace_back(file_name); + } + sdk_loader->cfsClose(id, fd); + return; + } + count = count * 2; + } +} + + +void DiskCubeFS::removeFile(const String & path) +{ + fs::path full_path = (fs::path(settings->disk_path) / path); + if (!fileExists(path)) + { + throwFromErrnoWithPath("File does not exists: " + full_path.string(), full_path, ErrorCodes::FILE_DOESNT_EXIST); + } + int result = sdk_loader->cfsUnlink(id, const_cast(full_path.c_str())); + if (result != 0) + { + throwFromErrnoWithPath("Cannot unlink file: " + full_path.string(), full_path, ErrorCodes::CANNOT_UNLINK); + } +} + +void loadCubeFSChangeableConfig(const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + UInt64 & keep_free_space_bytes){ + bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio"); if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only one of 'keep_free_space_bytes' and 'keep_free_space_ratio' can be specified"); keep_free_space_bytes = config.getUInt64(config_prefix + ".keep_free_space_bytes", 0); + } +void DiskCubeFS::moveDirectory(const String & from_path, const String & to_path) +{ + moveFile(from_path, to_path); } class DiskCubeFSDirectoryIterator final : public IDiskDirectoryIterator { public: - DiskCubeFSDirectoryIterator() = default; - DiskCubeFSDirectoryIterator(const String & disk_path_, const String & dir_path_) - : dir_path(dir_path_), entry(fs::path(disk_path_) / dir_path_) + DiskCubeFSDirectoryIterator() : id(-1), fd(-1), current_index(0) { } + DiskCubeFSDirectoryIterator(int64_t client_id, const String & path, std::shared_ptr sdk_loader_) + : id(client_id), dir_path(path), fd(-1), current_index(0), sdk_loader(sdk_loader_) { + openDirectory(); } - - void next() override { ++entry; } - - bool isValid() const override { return entry != fs::directory_iterator(); } - + ~DiskCubeFSDirectoryIterator() override { closeDirectory(); } + void next() override { ++current_index; } + bool isValid() const override { return current_index < dirents.size(); } String path() const override { - if (entry->is_directory()) - return dir_path / entry->path().filename() / ""; + if (isValid()) + return (fs::path(dir_path) / dirents[current_index]).string(); else - return dir_path / entry->path().filename(); + return ""; + } + String name() const override + { + if (isValid()) + return dirents[current_index]; + else + return ""; } - - - String name() const override { return entry->path().filename(); } private: - fs::path dir_path; - fs::directory_iterator entry; + int64_t id; + String dir_path; + int fd; + std::vector dirents; + size_t current_index; + std::shared_ptr sdk_loader; + void openDirectory() + { + fd = sdk_loader->cfsOpen(id, const_cast(dir_path.c_str()), O_RDONLY | O_DIRECTORY, 0755); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + dir_path, fs::path(dir_path), ErrorCodes::CANNOT_OPEN_FILE); + } + int count = default_readdir_count; + while (true) + { + std::vector direntsInfo(count); + std::memset(direntsInfo.data(), 0, count * sizeof(cfs_dirent_info)); + int num_entries = sdk_loader->cfsReaddir(id, fd, {direntsInfo.data(), count, count}, count); + if (num_entries < 0) + { + throwFromErrnoWithPath("Cannot readdir: " + dir_path, fs::path(dir_path), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + if (num_entries < count) + { + for (int i = 0; i < num_entries; i++) + { + std::string file_name(direntsInfo[i].name); + dirents.emplace_back(file_name); + } + return; + } + count = count * 2; + } + } + void closeDirectory() + { + if (fd > 0) + sdk_loader->cfsClose(id, fd); + } }; - DiskDirectoryIteratorPtr DiskCubeFS::iterateDirectory(const String & path) { - fs::path meta_path = fs::path(disk_path) / path; - if (!broken && fs::exists(meta_path) && fs::is_directory(meta_path)) - return std::make_unique(disk_path, path); + fs::path meta_path = fs::path(settings->disk_path) / path; + if (!broken && directoryExists(meta_path)) + return std::make_unique(id, meta_path.string(), sdk_loader); else return std::make_unique(); } +void DiskCubeFS::createFile(const String & path) +{ + fs::path parent_dir = getParentPath(path); + if (!exists(parent_dir)) + { + throwFromErrnoWithPath( + "Parent directory does not exist [" + parent_dir.string() + "]", parent_dir, ErrorCodes::DIRECTORY_DOESNT_EXIST); + } + fs::path full_path = fs::path(settings->disk_path) / path; + int fd = sdk_loader->cfsOpen( + id, + const_cast(full_path.string().c_str()), + O_WRONLY | O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + full_path.string(), full_path, ErrorCodes::CANNOT_OPEN_FILE); + } + sdk_loader->cfsClose(id, fd); +} -void DiskCubeFS::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) +void DiskCubeFS::replaceFile(const String & from_path, const String & to_path) { - String new_disk_path; - UInt64 new_keep_free_space_bytes; + fs::path parent_dir = getParentPath(to_path); + if (!exists(parent_dir)) + { + throwFromErrnoWithPath( + "Destination directory does not exist [" + parent_dir.string() + "]", parent_dir, ErrorCodes::DIRECTORY_DOESNT_EXIST); + } + fs::path full_from_path = fs::path(settings->disk_path) / from_path; + fs::path full_to_path = fs::path(settings->disk_path) / to_path; + int result + = sdk_loader->cfsRename(id, const_cast(full_from_path.string().c_str()), const_cast(full_to_path.string().c_str())); + if (result != 0) + { + throwFromErrnoWithPath( + "Failed to rename file, from path: " + full_from_path.string() + " to path: " + full_to_path.string(), + "", + ErrorCodes::ATOMIC_RENAME_FAIL); + } +} - loadDiskCubeFSConfig(name, config, config_prefix, context, new_disk_path, new_keep_free_space_bytes); +void DiskCubeFS::moveFile(const String & from_path, const String & to_path) +{ + if (exists(to_path)) + { + throwFromErrnoWithPath( + "Destination file already exists [" + (fs::path(settings->disk_path) / to_path).string() + "]", + fs::path(settings->disk_path) / to_path, + ErrorCodes::FILE_ALREADY_EXISTS); + } + replaceFile(from_path, to_path); +} - if (disk_path != new_disk_path) - throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Disk path can't be updated from config {}", name); +void DiskCubeFS::removeFileIfExists(const String & path) +{ + if (fileExists(path)) + { + auto fs_path = fs::path(settings->disk_path) / path; + if (0 != sdk_loader->cfsUnlink(id, const_cast(fs_path.c_str()))) + throwFromErrnoWithPath("Cannot unlink file " + fs_path.string(), fs_path, ErrorCodes::CANNOT_UNLINK); + } +} - if (keep_free_space_bytes != new_keep_free_space_bytes) - keep_free_space_bytes = new_keep_free_space_bytes; +void DiskCubeFS::removeDirectory(const String & path) +{ + auto fs_path = fs::path(settings->disk_path) / path; + if (0 != sdk_loader->cfsRmdir(id, const_cast(fs_path.c_str()))) + { + int error_code = errno; + std::string error_message = strerror(error_code); + std::string full_error_message = "Cannot rmdir " + fs_path.string() + ": " + error_message; + throwFromErrnoWithPath(full_error_message, fs_path, ErrorCodes::CANNOT_RMDIR); + } } -DiskCubeFS::DiskCubeFS(const String & name_, const String & path_, UInt64 keep_free_space_bytes_) - : DiskLocal(name_, path_, keep_free_space_bytes_) +void DiskCubeFS::removeRecursive(const String & path) { - logger = &Poco::Logger::get("DiskCubeFS"); + cfs_stat_info stat = getFileAttributes(path); + if (S_ISDIR(stat.mode)) + { + std::vector file_names; + listFiles(path, file_names); + for (const auto & filename : file_names) + { + auto child_path = fs::path(path) / filename; + removeRecursive(child_path.string()); + } + removeDirectory(path); + } + else + { + removeFile(path); + } } -DiskCubeFS::DiskCubeFS( - const String & name_, const String & path_, UInt64 keep_free_space_bytes_, [[maybe_unused]] ContextPtr context, [[maybe_unused]] UInt64 local_disk_check_period_ms) - : DiskCubeFS(name_, path_, keep_free_space_bytes_) +//warning: The file stat information is cached locally and may not be updated immediately +void DiskCubeFS::setLastModified(const String & path, const Poco::Timestamp & timestamp) { - // if (local_disk_check_period_ms > 0) - // disk_checker = std::make_unique(this, context, local_disk_check_period_ms); + constexpr uint32_t AttrModifyTime = 1 << 3; + constexpr uint32_t AttrAccessTime = 1 << 4; + cfs_stat_info stat = getFileAttributes(path); + stat.atime = timestamp.epochTime(); + stat.mtime = timestamp.epochTime(); + fs::path full_path = fs::path(settings->disk_path) / path; + if (sdk_loader->cfsSetattr(id, const_cast(full_path.string().c_str()), &stat, AttrModifyTime | AttrAccessTime) != 0) + { + throwFromErrnoWithPath("Cannot setattr " + full_path.string(), full_path, ErrorCodes::LOGICAL_ERROR); + } } +Poco::Timestamp DiskCubeFS::getLastModified(const String & path) +{ + return Poco::Timestamp::fromEpochTime(getLastChanged(path)); +} -DiskCubeFS::~DiskCubeFS() { +time_t DiskCubeFS::getLastChanged(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + return stat.ctime; +} +void DiskCubeFS::setReadOnly(const String & path) +{ + cfs_stat_info stat = getFileAttributes(path); + fs::path full_path = fs::path(settings->disk_path) / path; + int fd = sdk_loader->cfsOpen(id, const_cast(full_path.string().c_str()), 0, 0); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + full_path.string(), full_path, ErrorCodes::CANNOT_OPEN_FILE); + } + mode_t newMode = stat.mode & ~(S_IWUSR | S_IWGRP | S_IWOTH); + if (sdk_loader->cfsFchmod(id, fd, newMode) != 0) + { + throwFromErrnoWithPath("Cannot fchmod " + full_path.string(), full_path, ErrorCodes::LOGICAL_ERROR); + } } +void DiskCubeFS::createHardLink(const String &, const String &) +{ + throwFromErrnoWithPath("DiskCubeFs does not support function createHardLink!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); } +DiskCubeFS::DiskCubeFS(const String & name_, SettingsPtr settings_, UInt64 keep_free_space_bytes_) + : name(name_), settings(std::move(settings_)), logger(&Poco::Logger::get("DiskCubeFS")),keep_free_space_bytes(keep_free_space_bytes_) +{ + sdk_loader = std::make_shared(settings->lib_path); + id = sdk_loader->cfsNewClient(); + if (id <= 0) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to create client"); + } + setClientInfo("volName", const_cast(settings->vol_name.data())); + setClientInfo("masterAddr", const_cast(settings->master_addr.data())); + setClientInfo("logDir", const_cast(settings->log_dir.data())); + setClientInfo("logLevel", const_cast(settings->log_level.data())); + setClientInfo("accessKey", const_cast(settings->access_key.data())); + setClientInfo("secretKey", const_cast(settings->secret_key.data())); + setClientInfo("pushAddr", const_cast(settings->push_addr.data())); + if (sdk_loader->cfsStartClient(id) != 0) + { + throwFromErrnoWithPath("Start cfs client failed", "", ErrorCodes::LOGICAL_ERROR); + } + createDirectories(""); +} + +DiskCubeFS::DiskCubeFS(const String & name_, ContextPtr, SettingsPtr settings_, UInt64 keep_free_space_bytes_) + : DiskCubeFS(name_, settings_, keep_free_space_bytes_) +{ +} + +void DiskCubeFS::getClientId() +{ + id = sdk_loader->cfsNewClient(); + if (id <= 0) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to create client"); + } +} + +void DiskCubeFS::setClientInfo(const char * key, char * value) +{ + if (sdk_loader->cfsSetClient(id, strdup(key), value) != 0) + { + sdk_loader->cfsCloseClient(id); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set client info"); + } +} + +DiskCubeFSSettings ::DiskCubeFSSettings( + String vol_name_, + String master_addr_, + String log_dir_, + String log_level_, + String access_key_, + String secret_key_, + String push_addr_, + String disk_path_, + String lib_path_) + : vol_name(vol_name_) + , master_addr(master_addr_) + , log_dir(log_dir_) + , log_level(log_level_) + , access_key(access_key_) + , secret_key(secret_key_) + , push_addr(push_addr_) + , disk_path(disk_path_) + , lib_path(lib_path_) +{ +} + +bool DiskCubeFS::canRead(const std::string & path) +{ + cfs_stat_info stat = getFileAttributes(path); + if (stat.uid == geteuid()) + return (stat.mode & S_IRUSR) != 0; + else if (stat.gid == getegid()) + return (stat.mode & S_IRGRP) != 0; + else + return (stat.mode & S_IROTH) != 0 || geteuid() == 0; +} + +bool DiskCubeFS::exists(const String & path) const +{ + fs::path full_path = fs::path(settings->disk_path) / path; + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.string().c_str()), &stat); + return (result == 0); +} + +bool DiskCubeFS::fileExists(const String & path) const +{ + fs::path full_path = fs::path(settings->disk_path) / path; + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.string().c_str()), &stat); + return (result == 0) && (S_ISREG(stat.mode)); +} + +bool DiskCubeFS::directoryExists(const String & path) const +{ + fs::path full_path = fs::path(settings->disk_path) / path; + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.string().c_str()), &stat); + return (result == 0) && (S_ISDIR(stat.mode)); +} + +std::optional DiskCubeFS::fileSizeSafe(const fs::path & path) +{ + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(path.c_str()), &stat); + if (result == 0) + { + return stat.size; + } + else if (result == -ENOENT || result == -ENOTDIR) + { + return std::nullopt; + } + else + { + throw std::runtime_error("Failed to get file size: " + path.string()); + } +} + +std::unique_ptr +DiskCubeFS::readFile(const String & path, const ReadSettings &, std::optional, std::optional) const +{ + if (!fileExists(path)) + { + fs::path full_path = fs::path(settings->disk_path) / path; + throwFromErrnoWithPath("File does not exists: " + full_path.string(), full_path, ErrorCodes::FILE_DOESNT_EXIST); + } + return std::make_unique(id, fs::path(settings->disk_path) / path, sdk_loader, O_RDONLY | O_CLOEXEC); +} + +// std::optional DiskCubeFS::readDiskCheckerMagicNumber() const noexcept +// { +// try +// { +// auto buf = readFile(disk_checker_path); +// UInt32 magic_number; +// readIntBinary(magic_number, *buf); +// if (buf->eof()) +// return magic_number; +// LOG_WARNING(logger, "The size of disk check magic number is more than 4 bytes. Mark it as read failure"); +// return {}; +// } +// catch (...) +// { +// tryLogCurrentException( +// logger, fmt::format("Cannot read correct disk check magic number from from {}{}", settings->disk_path, disk_checker_path)); +// return {}; +// } +// } + +void DiskCubeFS::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr , const String & config_prefix, const DisksMap &) +{ + UInt64 new_keep_free_space_bytes; + loadCubeFSChangeableConfig(config, config_prefix, new_keep_free_space_bytes); + if (keep_free_space_bytes != new_keep_free_space_bytes) + keep_free_space_bytes = new_keep_free_space_bytes; +} + +std::unique_ptr DiskCubeFS::writeFile(const String & path, size_t buf_size, WriteMode mode) +{ + int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1; + return std::make_unique(id, fs::path(settings->disk_path) / path, sdk_loader, buf_size, flags); +} + +DiskCubeFS::~DiskCubeFS() { + +} + +} diff --git a/src/Disks/CubeFS/DiskCubeFS.h b/src/Disks/CubeFS/DiskCubeFS.h index da8042ee8e8f..d1add66048c9 100644 --- a/src/Disks/CubeFS/DiskCubeFS.h +++ b/src/Disks/CubeFS/DiskCubeFS.h @@ -1,54 +1,121 @@ #pragma once -#include -#include -#include -#include -#include +#include #include - -namespace ErrorCodes -{ - extern const int BAD_ARGUMENTS; -} +#include "sdkLoader.h" namespace DB { +struct DiskCubeFSSettings +{ + DiskCubeFSSettings( + String vol_name_, + String master_addr_, + String log_dir_, + String log_level_, + String access_key_, + String secret_key_, + String push_addr_, + String disk_path_, + String lib_path_); + String vol_name; + String master_addr; + String log_dir; + String log_level; + String access_key; + String secret_key; + String push_addr; + String disk_path; + String lib_path; +}; class DiskCubeFSCheckThread; -class DiskCubeFS : public DiskLocal +class DiskCubeFS final : public IDisk { public: - friend class DiskCubeFSCheckThread; +friend class DiskCubeFSCheckThread; friend class DiskCubeFSReservation; + using SettingsPtr = std::shared_ptr; - DiskCubeFS(const String & name_, const String & path_, UInt64 keep_free_space_bytes_); - DiskCubeFS( - const String & name_, - const String & path_, - UInt64 keep_free_space_bytes_, - ContextPtr context, - UInt64 local_disk_check_period_ms); + DiskCubeFS(const String & name_, SettingsPtr settings_, UInt64 keep_free_space_bytes_); + DiskCubeFS(const String & name_, ContextPtr context, SettingsPtr settings_, UInt64 keep_free_space_bytes_); virtual ~DiskCubeFS() override; - + std::unique_ptr + readFile(const String & path, const ReadSettings & settings, std::optional read_hint, std::optional file_size) + const override; + std::unique_ptr writeFile(const String & path, size_t buf_size, WriteMode mode) override; + bool exists(const String & path) const override; + const String & getName() const override { return name; } + const String & getPath() const override { return settings->disk_path; } + ReservationPtr reserve(UInt64 bytes) override; + UInt64 getTotalSpace() const override; + UInt64 getUnreservedSpace() const override; + bool isFile(const String & path) const override; + bool isDirectory(const String & path) const override; + size_t getFileSize(const String & path) const override; + //differs from createDirectories, an error will be reported if the parent directory does not exist. + void createDirectory(const String & path) override; + void createDirectories(const String & path) override; + /* + Delete all files under the folder + Empty subfolders can be deleted, but if there are files in the subfolders, an error will be reported.. + */ + void clearDirectory(const String & path) override; + /* + Follow fs::rename semantics. If the upper-level directory on the destination does not exist, an error will be reported. + By modifying the cubefs code, the cfs_rename method supports overwriting the destination file. + */ + void moveDirectory(const String & from_path, const String & to_path) override; + //differs from replaceFile, will not overwrite the destination file + void moveFile(const String & from_path, const String & to_path) override; + void replaceFile(const String & from_path, const String & to_path) override; DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + //If the parent directory does not exist, an error will be reported. + void createFile(const String & path) override; + void listFiles(const String & path, std::vector & file_names) override; + void removeFile(const String & path) override; + void removeFileIfExists(const String & path) override; + void removeDirectory(const String & path) override; + void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; + Poco::Timestamp getLastModified(const String & path) override; + time_t getLastChanged(const String & path) const override; + void setReadOnly(const String & path) override; + void createHardLink(const String & src_path, const String & dst_path) override; + DiskType getType() const override { return DiskType::CubeFS; } + bool isRemote() const override { return true; } + bool supportZeroCopyReplication() const override { return false; } + void removeRecursive(const String & path) override; + UInt64 getAvailableSpace() const override; void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) override; bool supportDataSharing() const override { return true; } - bool isRemote() const override { return false; } - bool supportZeroCopyReplication() const override { return false; } +private: + bool fileExists(const String & path) const; + bool directoryExists(const String & path) const; + cfs_stat_info getFileAttributes(const String & relative_path) const; + bool canRead(const std::string & path); + std::optional fileSizeSafe(const fs::path & path); + bool tryReserve(UInt64 bytes); + void getClientId(); + void setClientInfo(const char * key, char * value); + fs::path getParentPath(const String & path); + + const String name; + SettingsPtr settings; + Poco::Logger * logger; + static std::mutex reservation_mutex; + std::atomic keep_free_space_bytes; + std::atomic broken{false}; + std::atomic readonly{false}; + UInt64 reserved_bytes = 0; + //const String disk_checker_path = ".disk_checker_file"; + UInt64 reservation_count = 0; + int64_t id; + std::shared_ptr sdk_loader; }; -extern void loadDiskCubeFSConfig(const String & name, - const Poco::Util::AbstractConfiguration & config, +extern void loadCubeFSChangeableConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, - ContextPtr context, - String & path, UInt64 & keep_free_space_bytes); -using DiskCubeFSPtr = std::shared_ptr; - } - - - diff --git a/src/Disks/CubeFS/DiskCubeFSCheckThread.cpp b/src/Disks/CubeFS/DiskCubeFSCheckThread.cpp deleted file mode 100644 index 7e1cb683c313..000000000000 --- a/src/Disks/CubeFS/DiskCubeFSCheckThread.cpp +++ /dev/null @@ -1,70 +0,0 @@ -#include - -#include -#include -#include - -namespace DB -{ -static const auto DISK_CHECK_ERROR_SLEEP_MS = 1000; -static const auto DISK_CHECK_ERROR_RETRY_TIME = 3; - -DiskCubeFSCheckThread::DiskCubeFSCheckThread(DiskCubeFS * disk_, ContextPtr context_, UInt64 local_disk_check_period_ms) - : WithContext(context_) - , disk(std::move(disk_)) - , check_period_ms(local_disk_check_period_ms) - , log(&Poco::Logger::get(fmt::format("DiskCubeFSCheckThread({})", disk->getName()))) -{ - task = getContext()->getSchedulePool().createTask(log->name(), [this] { run(); }); -} - -void DiskCubeFSCheckThread::startup() -{ - need_stop = false; - retry = 0; - task->activateAndSchedule(); -} - -void DiskCubeFSCheckThread::run() -{ - if (need_stop) - return; - - bool can_read = disk->canRead(); - bool can_write = disk->canWrite(); - if (can_read) - { - if (disk->broken) - LOG_INFO(log, "Disk {0} seems to be fine. It can be recovered using `SYSTEM RESTART DISK {0}`", disk->getName()); - retry = 0; - if (can_write) - disk->readonly = false; - else - { - disk->readonly = true; - LOG_INFO(log, "Disk {} is readonly", disk->getName()); - } - task->scheduleAfter(check_period_ms); - } - else if (!disk->broken && retry < DISK_CHECK_ERROR_RETRY_TIME) - { - ++retry; - task->scheduleAfter(DISK_CHECK_ERROR_SLEEP_MS); - } - else - { - retry = 0; - disk->broken = true; - LOG_INFO(log, "Disk {} is broken", disk->getName()); - task->scheduleAfter(check_period_ms); - } -} - -void DiskCubeFSCheckThread::shutdown() -{ - need_stop = true; - task->deactivate(); - LOG_TRACE(log, "DiskCubeFSCheck thread finished"); -} - -} diff --git a/src/Disks/CubeFS/DiskCubeFSCheckThread.h b/src/Disks/CubeFS/DiskCubeFSCheckThread.h deleted file mode 100644 index ca471558a1de..000000000000 --- a/src/Disks/CubeFS/DiskCubeFSCheckThread.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include -#include - -namespace Poco -{ -class Logger; -} - -namespace DB -{ -class DiskCubeFS; - -class DiskCubeFSCheckThread : WithContext -{ -public: - friend class DiskCubeFS; - - DiskCubeFSCheckThread(DiskCubeFS * disk_, ContextPtr context_, UInt64 local_disk_check_period_ms); - - void startup(); - - void shutdown(); - -private: - bool check(); - void run(); - - DiskCubeFS * disk; - size_t check_period_ms; - Poco::Logger * log; - std::atomic need_stop{false}; - - BackgroundSchedulePool::TaskHolder task; - size_t retry{}; -}; - -} diff --git a/src/Disks/CubeFS/registerDiskCubeFS.cpp b/src/Disks/CubeFS/registerDiskCubeFS.cpp index 125f53d48b52..77286c4093ca 100644 --- a/src/Disks/CubeFS/registerDiskCubeFS.cpp +++ b/src/Disks/CubeFS/registerDiskCubeFS.cpp @@ -1,35 +1,60 @@ -#include -#include -#include -#include - +#include +#include "DiskCubeFS.h" +#include "Disks/DiskFactory.h" +#include "Disks/DiskRestartProxy.h" namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; +} +DiskCubeFS::SettingsPtr +getSettings(const String & name, ContextPtr context, const Poco::Util::AbstractConfiguration & config, const String & config_prefix) +{ + String disk_path = config.getString(config_prefix + ".disk_path"); + if (disk_path.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path can not be empty. Disk {}", name); + if (disk_path.back() != '/') + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path must end with /. Disk {}", name); + if (disk_path == context->getPath()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path ('{}') cannot be equal to . Use disk instead.", disk_path); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "volumn name: {}", config.getString(config_prefix + ".vol_name")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "master address: {}", config.getString(config_prefix + ".master_addr")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "log directory: {}", config.getString(config_prefix + ".log_dir")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "log level: {}", config.getString(config_prefix + ".log_level")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "access key: {}", config.getString(config_prefix + ".access_key")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "secret key: {}", config.getString(config_prefix + ".secret_key")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "push address: {}", config.getString(config_prefix + ".push_addr")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "disk path: {}", disk_path); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "lib path: {}", config.getString(config_prefix + ".lib_path")); + return std::make_shared( + config.getString(config_prefix + ".vol_name"), + config.getString(config_prefix + ".master_addr"), + config.getString(config_prefix + ".log_dir"), + config.getString(config_prefix + ".log_level", "debug"), + config.getString(config_prefix + ".access_key"), + config.getString(config_prefix + ".secret_key"), + config.getString(config_prefix + ".push_addr"), + disk_path, + config.getString(config_prefix + ".lib_path")); +} void registerDiskCubeFS(DiskFactory & factory) { - auto creator = []( - const String & name, - const Poco::Util::AbstractConfiguration & config, - const String & config_prefix, - ContextPtr context, - const DisksMap & map) -> DiskPtr - { - String path; - UInt64 keep_free_space_bytes; - loadDiskCubeFSConfig(name, config, config_prefix, context, path, keep_free_space_bytes); - - for (const auto & [disk_name, disk_ptr] : map) - if (path == disk_ptr->getPath()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk {} and disk {} cannot have the same path ({})", name, disk_name, path); - - DiskPtr disk - = std::make_shared(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0)); - disk->startup(); - return disk; - }; - + auto creator = [](const String & name, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + ContextPtr context, + const DisksMap & /*map*/) -> DiskPtr + { + auto settings = getSettings(name, context, config, config_prefix); + UInt64 keep_free_space_bytes; + loadCubeFSChangeableConfig(config, config_prefix, keep_free_space_bytes); + DiskPtr cubefsDisk = std::make_shared(name, context, settings, keep_free_space_bytes); + return cubefsDisk; + }; factory.registerDiskType("cubefs", creator); } diff --git a/src/Disks/CubeFS/sdkLoader.cpp b/src/Disks/CubeFS/sdkLoader.cpp new file mode 100644 index 000000000000..188f47e81d4a --- /dev/null +++ b/src/Disks/CubeFS/sdkLoader.cpp @@ -0,0 +1,48 @@ +#include "sdkLoader.h" +#include +#include +namespace DB +{ +namespace ErrorCodes +{ + extern const int EXTERNAL_LIBRARY_ERROR; +} +SdkLoader::SdkLoader(String lib_path_) : lib_path(lib_path_), logger(&Poco::Logger::get("sdkLoader")) +{ + LOG_DEBUG(logger, "load cubefs shared library from path: {}", lib_path_); + void * handle = dlopen(lib_path_.c_str(), RTLD_LAZY); + if (!handle) + { + throwFromErrnoWithPath("Failed to open the dynamic library", lib_path_, ErrorCodes::EXTERNAL_LIBRARY_ERROR); + } + cfsNewClient = reinterpret_cast(dlsym(handle, "cfs_new_client")); + cfsSetClient = reinterpret_cast(dlsym(handle, "cfs_set_client")); + cfsStartClient = reinterpret_cast(dlsym(handle, "cfs_start_client")); + cfsCloseClient = reinterpret_cast(dlsym(handle, "cfs_close_client")); + cfsChdir = reinterpret_cast(dlsym(handle, "cfs_chdir")); + cfsGetcwd = reinterpret_cast(dlsym(handle, "cfs_getcwd")); + cfsGetattr = reinterpret_cast(dlsym(handle, "cfs_getattr")); + cfsSetattr = reinterpret_cast(dlsym(handle, "cfs_setattr")); + cfsOpen = reinterpret_cast(dlsym(handle, "cfs_open")); + cfsFlush = reinterpret_cast(dlsym(handle, "cfs_flush")); + cfsClose = reinterpret_cast(dlsym(handle, "cfs_close")); + cfsLsdir = reinterpret_cast(dlsym(handle, "cfs_lsdir")); + cfsMkdirs = reinterpret_cast(dlsym(handle, "cfs_mkdirs")); + cfsRmdir = reinterpret_cast(dlsym(handle, "cfs_rmdir")); + cfsUnlink = reinterpret_cast(dlsym(handle, "cfs_unlink")); + cfsRename = reinterpret_cast(dlsym(handle, "cfs_rename")); + cfsFchmod = reinterpret_cast(dlsym(handle, "cfs_fchmod")); + cfsGetsummary = reinterpret_cast(dlsym(handle, "cfs_getsummary")); + cfsWrite = reinterpret_cast(dlsym(handle, "cfs_write")); + cfsRead = reinterpret_cast(dlsym(handle, "cfs_read")); + cfsReaddir = reinterpret_cast(dlsym(handle, "cfs_readdir")); + + if (!cfsNewClient || !cfsSetClient || !cfsStartClient || !cfsCloseClient || !cfsChdir || !cfsGetcwd || !cfsGetattr || !cfsSetattr + || !cfsOpen || !cfsFlush || !cfsClose || !cfsWrite || !cfsRead || !cfsLsdir || !cfsMkdirs || !cfsRmdir || !cfsUnlink || !cfsRename + || !cfsFchmod || !cfsGetsummary || !cfsReaddir) + { + dlclose(handle); + throwFromErrnoWithPath("Failed to find one or more functions", "", ErrorCodes::EXTERNAL_LIBRARY_ERROR); + } +} +} diff --git a/src/Disks/CubeFS/sdkLoader.h b/src/Disks/CubeFS/sdkLoader.h new file mode 100644 index 000000000000..77c35527fe1b --- /dev/null +++ b/src/Disks/CubeFS/sdkLoader.h @@ -0,0 +1,63 @@ +#pragma once +#include +#include +#include + +namespace DB +{ +class SdkLoader +{ +public: + SdkLoader(String lib_path_); + + typedef int64_t (*CfsNewClientFunction)(); + typedef int (*CfsSetClientFunction)(int64_t, char *, char *); + typedef int (*CfsStartClientFunction)(int64_t); + typedef void (*CfsCloseClientFunction)(int64_t); + typedef int (*CfsChdirFunction)(int64_t, char *); + typedef char * (*CfsGetcwdFunction)(int64_t); + typedef int (*CfsGetattrFunction)(int64_t, char *, struct cfs_stat_info *); + typedef int (*CfsSetattrFunction)(int64_t, char *, struct cfs_stat_info *, int); + typedef int (*CfsOpenFunction)(int64_t, char *, int, mode_t); + typedef int (*CfsFlushFunction)(int64_t, int); + typedef void (*CfsCloseFunction)(int64_t, int); + typedef ssize_t (*CfsWriteFunction)(int64_t, int, void *, size_t, off_t); + typedef ssize_t (*CfsReadFunction)(int64_t, int, void *, size_t, off_t); + //typedef int (*CfsBatchGetInodesFunction)(int64_t, int, void *, GoSlice, int); + //typedef int (*CfsRefreshSummaryFunction)(int64_t, char *, int); + typedef int (*CfsReaddirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsLsdirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsMkdirsFunction)(int64_t, char *, mode_t); + typedef int (*CfsRmdirFunction)(int64_t, char *); + typedef int (*CfsUnlinkFunction)(int64_t, char *); + typedef int (*CfsRenameFunction)(int64_t, char *, char *); + typedef int (*CfsFchmodFunction)(int64_t, int, mode_t); + typedef int (*CfsGetsummaryFunction)(int64_t, char *, struct cfs_summary_info *, char *, int); + + CfsNewClientFunction cfsNewClient; + CfsSetClientFunction cfsSetClient; + CfsStartClientFunction cfsStartClient; + CfsCloseClientFunction cfsCloseClient; + CfsChdirFunction cfsChdir; + CfsGetcwdFunction cfsGetcwd; + CfsGetattrFunction cfsGetattr; + CfsSetattrFunction cfsSetattr; + CfsOpenFunction cfsOpen; + CfsFlushFunction cfsFlush; + CfsCloseFunction cfsClose; + CfsLsdirFunction cfsLsdir; + CfsMkdirsFunction cfsMkdirs; + CfsRmdirFunction cfsRmdir; + CfsUnlinkFunction cfsUnlink; + CfsRenameFunction cfsRename; + CfsFchmodFunction cfsFchmod; + CfsGetsummaryFunction cfsGetsummary; + CfsWriteFunction cfsWrite; + CfsReadFunction cfsRead; + CfsReaddirFunction cfsReaddir; + +private: + String lib_path; + Poco::Logger * logger; +}; +} diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index a1639e96d108..600c6adc4780 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -15,11 +15,11 @@ #include #include #include -#include #include #include #include #include +#include namespace CurrentMetrics { @@ -371,9 +371,61 @@ void DiskLocal::removeDirectory(const String & path) throwFromErrnoWithPath("Cannot rmdir " + fs_path.string(), fs_path, ErrorCodes::CANNOT_RMDIR); } +void DiskLocal::remove_all_for_test(const String & path) +{ + LOG_DEBUG(log, "mock remove_all path: {}", path); + if (path.find("/mnt/cfs") != 0) + { + fs::remove_all(fs::path(path)); + return; + } + for (const auto & entry : std::filesystem::directory_iterator(fs::path(path))) + { + if (entry.is_regular_file() || entry.is_directory()) + { + std::error_code ec = std::make_error_code(std::errc::directory_not_empty); + throw std::filesystem::filesystem_error("Directory not empty", entry.path(), ec); + } + } + fs::remove_all(path); +} + +void DiskLocal::renameFiles(const fs::path & source_dir, const fs::path & target_dir) +{ + if (!fs::exists(target_dir.parent_path())) + { + fs::create_directories(target_dir.parent_path()); + } + LOG_DEBUG(log, "move files from {} to {}", source_dir.string(), target_dir.string()); + fs::rename(source_dir, target_dir); +} + void DiskLocal::removeRecursive(const String & path) { - fs::remove_all(fs::path(disk_path) / path); + try + { + fs::remove_all(fs::path(disk_path) / path); + } + catch (const fs::filesystem_error & e) + { + if (e.code() == std::errc::directory_not_empty && handle_remove_error_path != "" && disk_path.find("/mnt/cfs") == 0) + { + LOG_DEBUG( + log, + "remove_all get directory not empty error, try to move, error: {}, disk path: {}, remove path: {}", + e.what(), + disk_path, + (fs::path(disk_path) / path).string()); + fs::path to_path = fs::path(handle_remove_error_path) / fs::relative(e.path1(), disk_path); + renameFiles(e.path1(), to_path); + if (fs::exists(fs::path(disk_path) / path)) + { + removeRecursive(path); + } + } + else + throw; + } } void DiskLocal::listFiles(const String & path, std::vector & file_names) @@ -421,9 +473,8 @@ void DiskLocal::createFile(const String & path) void DiskLocal::setReadOnly(const String & path) { - fs::permissions(fs::path(disk_path) / path, - fs::perms::owner_write | fs::perms::group_write | fs::perms::others_write, - fs::perm_options::remove); + fs::permissions( + fs::path(disk_path) / path, fs::perms::owner_write | fs::perms::group_write | fs::perms::others_write, fs::perm_options::remove); } bool inline isSameDiskType(const IDisk & one, const IDisk & another) @@ -469,10 +520,7 @@ void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & confi } DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_) - : name(name_) - , disk_path(path_) - , keep_free_space_bytes(keep_free_space_bytes_) - , logger(&Poco::Logger::get("DiskLocal")) + : name(name_), disk_path(path_), keep_free_space_bytes(keep_free_space_bytes_), logger(&Poco::Logger::get("DiskLocal")) { } @@ -482,6 +530,8 @@ DiskLocal::DiskLocal( { if (local_disk_check_period_ms > 0) disk_checker = std::make_unique(this, context, local_disk_check_period_ms); + const auto & settings = context->getSettingsRef(); + handle_remove_error_path = String(settings.handle_remove_error_path); } void DiskLocal::startup() @@ -672,6 +722,11 @@ void registerDiskLocal(DiskFactory & factory) std::shared_ptr disk = std::make_shared(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0)); + LOG_DEBUG( + &Poco::Logger::get("DiskLocal"), + "register local disk, name: {}, handle remove error path: {}", + name, + String(context->getSettingsRef().handle_remove_error_path)); disk->startup(); return std::make_shared(disk); }; diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h index 216f0a31f477..d57fc46e8bda 100644 --- a/src/Disks/DiskLocal.h +++ b/src/Disks/DiskLocal.h @@ -22,11 +22,7 @@ class DiskLocal : public IDisk DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_); DiskLocal( - const String & name_, - const String & path_, - UInt64 keep_free_space_bytes_, - ContextPtr context, - UInt64 local_disk_check_period_ms); + const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context, UInt64 local_disk_check_period_ms); const String & getName() const override { return name; } @@ -128,10 +124,13 @@ class DiskLocal : public IDisk /// Read magic number from disk checker file. Return std::nullopt if exception happens. std::optional readDiskCheckerMagicNumber() const noexcept; + void remove_all_for_test(const String & path); + void renameFiles(const fs::path & source_dir, const fs::path & target_dir); const String name; const String disk_path; const String disk_checker_path = ".disk_checker_file"; + String handle_remove_error_path; std::atomic keep_free_space_bytes; Poco::Logger * logger; diff --git a/src/Disks/registerDisks.cpp b/src/Disks/registerDisks.cpp index 2fc4f6dfdac2..11b6cdfae64e 100644 --- a/src/Disks/registerDisks.cpp +++ b/src/Disks/registerDisks.cpp @@ -9,7 +9,6 @@ namespace DB void registerDiskLocal(DiskFactory & factory); void registerDiskMemory(DiskFactory & factory); - #if USE_AWS_S3 void registerDiskS3(DiskFactory & factory); #endif diff --git a/src/Disks/tests/gtest_disk_cubefs.cpp b/src/Disks/tests/gtest_disk_cubefs.cpp new file mode 100644 index 000000000000..cb603304c941 --- /dev/null +++ b/src/Disks/tests/gtest_disk_cubefs.cpp @@ -0,0 +1,256 @@ +#include +#include +#include +#include +#include +#define RUN_CUBEFS_TEST 1 +#if RUN_CUBEFS_TEST + +# include +# include +# include +# include +# include +# include + +using namespace DB; + +const String path = "/test_dir"; +const String vol_name = "xieyichen"; +const String master_addr = "cfs-south.oppo.local"; +const String log_dir = "/home/xieyichen/logs/cfs2/test-log"; +const String log_level = "debug"; +const String access_key = "jRlZO65q7XlH5bnV"; +const String secret_key = "V1m730UzREHaK1jCkC0kL0cewOX0kH3K"; +const String push_addr = "cfs.dg-push.wanyol.com"; +const String disk_name = "cubefs"; +const String lib_path = "/home/xieyichen/ClickHouse/libcfs.so"; + +class DiskTestCubeFS : public testing::Test +{ +public: + void SetUp() override + { + auto settings = std::make_shared( + vol_name, master_addr, log_dir, log_level, access_key, secret_key, push_addr, path, lib_path); + std::cout << "set client info successfully " << std::endl; + disk = std::make_shared(disk_name, settings, 0); + std::cout << "initial cubefs disk successfully " << std::endl; + } + void TearDown() override + { + disk->removeRecursive(""); + disk.reset(); + } + DB::DiskPtr disk; +}; + +TEST_F(DiskTestCubeFS, createDirectory) +{ + EXPECT_TRUE(this->disk->exists("/")); + std::cout << "empty exist is ok" << std::endl; + EXPECT_EQ(disk_name, this->disk->getName()); + + disk->createDirectory("create_directory"); + EXPECT_TRUE(this->disk->exists("create_directory")); + EXPECT_TRUE(this->disk->isDirectory("create_directory")); + EXPECT_FALSE(this->disk->isFile("create_directory")); + + this->disk->createDirectories("not_exist_parent_directory/subdirectory"); + EXPECT_TRUE(this->disk->isDirectory("not_exist_parent_directory/subdirectory")); + std::cout << "class disk path: " << disk->getPath() << std::endl; +} + + +TEST_F(DiskTestCubeFS, moveDirectory) +{ + disk->createDirectory("create_directory"); + disk->createDirectories("not_exist_parent_directory/subdirectory"); + try + { + std::cout << "try move directory" << std::endl; + this->disk->moveDirectory("not_exist_parent_directory/subdirectory", "not_exist/create_directory/"); + FAIL() << "Expected exception to be thrown."; + //EXPECT_THROW(cubeFSdisk->moveDirectory("not_exist_parent_directory/subdirectory", "not_exist/create_directory/"), std::exception); + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "Destination directory does not exist"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + std::cout << "try move directory end" << std::endl; + this->disk->moveDirectory("not_exist_parent_directory/subdirectory", "create_directory/subdirectory"); + EXPECT_TRUE(this->disk->isDirectory("create_directory/subdirectory")); + std::cout << "move directory end" << std::endl; +} + +TEST_F(DiskTestCubeFS, createFile) +{ + try + { + disk->createFile("create_directory/file"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "Parent directory does not exist"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + disk->createDirectory("create_directory"); + + this->disk->createFile("create_directory/file"); + EXPECT_TRUE(this->disk->isFile("create_directory/file")); + EXPECT_FALSE(this->disk->isDirectory("create_directory/file")); + std::cout << "create file end" << std::endl; +} + +TEST_F(DiskTestCubeFS, moveFile) +{ + disk->createDirectory("create_directory"); + disk->createFile("create_directory/file"); + this->disk->moveDirectory("create_directory", "move_directory"); + EXPECT_TRUE(this->disk->isFile("move_directory/file")); + std::cout << "move directory end" << std::endl; + this->disk->moveFile("move_directory/file", "move_file"); + EXPECT_TRUE(this->disk->isFile("move_file")); + disk->createDirectory("create_directory"); + this->disk->createFile("create_directory/create_file"); + try + { + this->disk->moveFile("move_file", "create_directory/create_file"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "Destination file already exists"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + EXPECT_TRUE(disk->exists("create_directory/create_file")); + std::cout << "start replace file" << std::endl; + disk->replaceFile("move_file", "create_directory/create_file"); + EXPECT_TRUE(this->disk->isFile("create_directory/create_file")); +} + +TEST_F(DiskTestCubeFS, remove) +{ + try + { + disk->removeFile("not_exist"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "File does not exists"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + disk->removeFileIfExists("not_exist"); +} + +/* The file stat information is cached locally and may not be updated immediately +TEST_F(DiskTestCubeFS, lastModified) +{ + disk->createFile("file"); + std::this_thread::sleep_for(std::chrono::seconds(1)); + Poco::Timestamp timestamp; + disk->setLastModified("file", timestamp); + EXPECT_EQ(timestamp.epochTime(), disk->getLastModified("file").epochTime()); +} +*/ + +TEST_F(DiskTestCubeFS, writeFile) +{ + { + std::unique_ptr out = this->disk->writeFile("test_file"); + writeString("test data", *out); + } + + DB::String data; + { + std::unique_ptr in = this->disk->readFile("test_file"); + readString(data, *in); + } + + EXPECT_EQ("test data", data); + EXPECT_EQ(data.size(), this->disk->getFileSize("test_file")); +} + +TEST_F(DiskTestCubeFS, readFile) +{ + try + { + std::unique_ptr in = this->disk->readFile("test_file"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "File does not exists"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + + { + std::unique_ptr out = this->disk->writeFile("test_file"); + writeString("test data", *out); + } + + // Test SEEK_SET + { + String buf(4, '0'); + std::unique_ptr in = this->disk->readFile("test_file"); + + in->seek(5, SEEK_SET); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("data", buf); + } + + // Test SEEK_CUR + { + std::unique_ptr in = this->disk->readFile("test_file"); + String buf(4, '0'); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("test", buf); + + // Skip whitespace + in->seek(1, SEEK_CUR); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("data", buf); + } +} + +TEST_F(DiskTestCubeFS, iterateDirectory) +{ + this->disk->createDirectories("test_dir/nested_dir/"); + + { + auto iter = this->disk->iterateDirectory(""); + EXPECT_TRUE(iter->isValid()); + EXPECT_EQ("/test_dir/test_dir", iter->path()); + iter->next(); + EXPECT_FALSE(iter->isValid()); + } + + { + auto iter = this->disk->iterateDirectory("test_dir/"); + EXPECT_TRUE(iter->isValid()); + EXPECT_EQ("/test_dir/test_dir/nested_dir", iter->path()); + iter->next(); + EXPECT_FALSE(iter->isValid()); + } +} + +TEST_F(DiskTestCubeFS, others) +{ + std::cout << "getAvailableSpace result: " << disk->getAvailableSpace() << std::endl; + disk->createFile("file"); + disk->setReadOnly("file"); + std::cout << "getLastChanged result: " << disk->getLastChanged("file") << std::endl; +} + +#endif diff --git a/src/IO/ReadBufferFromCubeFS.cpp b/src/IO/ReadBufferFromCubeFS.cpp new file mode 100644 index 000000000000..07996fc4cbcb --- /dev/null +++ b/src/IO/ReadBufferFromCubeFS.cpp @@ -0,0 +1,195 @@ +#include +namespace ProfileEvents +{ +extern const Event ReadBufferFromFileDescriptorRead; +extern const Event ReadBufferFromFileDescriptorReadFailed; +extern const Event ReadBufferFromFileDescriptorReadBytes; +extern const Event DiskReadElapsedMicroseconds; +extern const Event Seek; +} + +namespace DB +{ +namespace ErrorCodes +{ + extern const int FILE_DOESNT_EXIST; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_CLOSE_FILE; +} + +ReadBufferFromCubeFS::~ReadBufferFromCubeFS() +{ + if (fd < 0) + return; + + sdk_loader->cfsClose(id, fd); +} + +void ReadBufferFromCubeFS::close() +{ + if (fd < 0) + return; + try + { + sdk_loader->cfsClose(id, fd); + } + catch (...) + { + throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE); + } + fd = -1; + //metric_increment.destroy(); +} + +off_t ReadBufferFromCubeFS::size() +{ + struct cfs_stat_info file_info; + int result = sdk_loader->cfsGetattr(id, const_cast(file_name.data()), &file_info); + if (result != 0) + { + // Handle the error (throw an exception, return an error code, etc.) + throwFromErrnoWithPath("Cannot execute fstat " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSTAT); + } + return file_info.size; +} + +bool ReadBufferFromCubeFS::nextImpl() +{ + /// If internal_buffer size is empty, then read() cannot be distinguished from EOF + assert(!internal_buffer.empty()); + /// This is a workaround of a read pass EOF bug in linux kernel with pread() + if (file_size.has_value() && file_offset_of_buffer_end >= *file_size) + return false; + size_t bytes_read = 0; + while (!bytes_read) + { + //ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead); + + //Stopwatch watch(profile_callback ? clock_type : CLOCK_MONOTONIC); + + ssize_t res = 0; + { + //CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; + res = sdk_loader->cfsRead(id, fd, internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end); + } + if (!res) + break; + + if (-1 == res && errno != EINTR) + { + //ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); + throwFromErrnoWithPath("Cannot read from file " + getFileName(), getFileName(), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + + if (res > 0) + bytes_read += res; + + /// It reports real time spent including the time spent while thread was preempted doing nothing. + /// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables). + /// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it + /// (TaskStatsInfoGetter has about 500K RPS). + //watch.stop(); + // ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); + + // if (profile_callback) + // { + // ProfileInfo info; + // info.bytes_requested = internal_buffer.size(); + // info.bytes_read = res; + // info.nanoseconds = watch.elapsed(); + // profile_callback(info); + // } + } + + file_offset_of_buffer_end += bytes_read; + + if (bytes_read) + { + //ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); + working_buffer = internal_buffer; + working_buffer.resize(bytes_read); + } + else + return false; + + return true; +} + +ReadBufferFromCubeFS::ReadBufferFromCubeFS( + int64_t id_, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + int flags, + char * existing_memory, + size_t buf_size, + size_t alignment, + std::optional file_size_) + : ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_), id(id_), file_name(file_name_), sdk_loader(sdk_loader_) +{ + //ProfileEvents::increment(ProfileEvents::FileOpen); + + fd = sdk_loader->cfsOpen( + id_, const_cast(file_name_.data()), flags == -1 ? O_RDONLY | O_CLOEXEC : flags | O_CLOEXEC, S_IRUSR | S_IWUSR); + + if (-1 == fd) + throwFromErrnoWithPath( + "Cannot open file " + file_name_, file_name_, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); +} + +/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. +off_t ReadBufferFromCubeFS::seek(off_t offset, int whence) +{ + size_t new_pos; + if (whence == SEEK_SET) + { + assert(offset >= 0); + new_pos = offset; + } + else if (whence == SEEK_CUR) + { + new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset; + } + else + { + throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + } + + /// Position is unchanged. + if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end) + return new_pos; + + if (file_offset_of_buffer_end - working_buffer.size() <= static_cast(new_pos) && new_pos <= file_offset_of_buffer_end) + { + /// Position is still inside the buffer. + /// Probably it is at the end of the buffer - then we will load data on the following 'next' call. + + pos = working_buffer.end() - file_offset_of_buffer_end + new_pos; + assert(pos >= working_buffer.begin()); + assert(pos <= working_buffer.end()); + + return new_pos; + } + else + { + /// Position is out of the buffer, we need to do real seek. + off_t seek_pos = new_pos; + + off_t offset_after_seek_pos = new_pos - seek_pos; + + /// First reset the buffer so the next read will fetch new data to the buffer. + resetWorkingBuffer(); + + /// In case of using 'pread' we just update the info about the next position in file. + /// In case of using 'read' we call 'lseek'. + + /// We account both cases as seek event as it leads to non-contiguous reads from file. + ProfileEvents::increment(ProfileEvents::Seek); + file_offset_of_buffer_end = seek_pos; + + if (offset_after_seek_pos > 0) + ignore(offset_after_seek_pos); + + return seek_pos; + } +} +} diff --git a/src/IO/ReadBufferFromCubeFS.h b/src/IO/ReadBufferFromCubeFS.h new file mode 100644 index 000000000000..8eb09d886408 --- /dev/null +++ b/src/IO/ReadBufferFromCubeFS.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; + extern const int ARGUMENT_OUT_OF_BOUND; + extern const int CANNOT_SEEK_THROUGH_FILE; + extern const int CANNOT_SELECT; + extern const int CANNOT_FSTAT; + extern const int CANNOT_ADVISE; +} +class ReadBufferFromCubeFS : public ReadBufferFromFileBase +{ +public: + explicit ReadBufferFromCubeFS( + int64_t id_, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + int flags = -1, + char * existing_memory = nullptr, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + size_t alignment = 0, + std::optional file_size_ = std::nullopt); + ~ReadBufferFromCubeFS() override; + std::string getFileName() const override { return file_name; } + Range getRemainingReadRange() const override { return Range{.left = file_offset_of_buffer_end, .right = std::nullopt}; } + size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; } + off_t getPosition() override { return file_offset_of_buffer_end - (working_buffer.end() - pos); } + void close(); + /** Could be used before initialization if needed 'fd' was not passed to constructor. + * It's not possible to change 'fd' during work. + */ + void setFD(int fd_) { fd = fd_; } + int getFD() const { return fd; } + off_t size(); + /// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. + off_t seek(off_t off, int whence) override; + +private: + bool nextImpl() override; + int fd; + int64_t id; + std::string file_name; + std::shared_ptr sdk_loader; + size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end(). +}; +} diff --git a/src/IO/WriteBufferFromCubeFS.cpp b/src/IO/WriteBufferFromCubeFS.cpp new file mode 100644 index 000000000000..ba63a2ddffaf --- /dev/null +++ b/src/IO/WriteBufferFromCubeFS.cpp @@ -0,0 +1,122 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int FILE_DOESNT_EXIST; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_CLOSE_FILE; + extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR; + extern const int CANNOT_FSYNC; + extern const int CANNOT_SEEK_THROUGH_FILE; + extern const int CANNOT_TRUNCATE_FILE; + extern const int CANNOT_FSTAT; +} + +WriteBufferFromCubeFS::WriteBufferFromCubeFS( + int64_t id_, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + size_t buf_size, + int flags, + mode_t mode, + char * existing_memory, + size_t alignment) + : WriteBufferFromFileBase(buf_size, existing_memory, alignment), id(id_), file_name(std::move(file_name_)), sdk_loader(sdk_loader_) +{ + //If O_APPEND is not added, cfsOpen will write from the specified offset position (0) each time. + fd = sdk_loader->cfsOpen( + id, + const_cast(file_name_.data()), + flags == -1 ? O_WRONLY | O_TRUNC | O_CREAT | O_CLOEXEC | O_APPEND : flags | O_CLOEXEC, + mode); + + if (-1 == fd) + throwFromErrnoWithPath( + "Cannot open file " + file_name, file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); +} + +WriteBufferFromCubeFS::~WriteBufferFromCubeFS() +{ + finalize(); + sdk_loader->cfsClose(id, fd); +} + +void WriteBufferFromCubeFS::close() +{ + if (fd < 0) + return; + next(); + sdk_loader->cfsClose(id, fd); + fd = -1; +} + +void WriteBufferFromCubeFS::finalizeImpl() +{ + if (fd < 0) + return; + + next(); +} + +void WriteBufferFromCubeFS::nextImpl() +{ + if (!offset()) + return; + + //Stopwatch watch; + + size_t bytes_written = 0; + while (bytes_written != offset()) + { + //ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite); + + ssize_t res = 0; + { + //CurrentMetrics::Increment metric_increment{CurrentMetrics::Write}; + res = sdk_loader->cfsWrite(id, fd, working_buffer.begin() + bytes_written, offset() - bytes_written, 0); + } + + if ((-1 == res || 0 == res) && errno != EINTR) + { + //ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteFailed); + + /// Don't use getFileName() here because this method can be called from destructor + String error_file_name = file_name; + if (error_file_name.empty()) + error_file_name = "(fd = " + toString(fd) + ")"; + throwFromErrnoWithPath("Cannot write to file " + error_file_name, error_file_name, ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR); + } + + if (res > 0) + bytes_written += res; + } + + //ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds()); + //ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written); +} + +void WriteBufferFromCubeFS::sync() +{ + /// If buffer has pending data - write it. + next(); + + int res = sdk_loader->cfsFlush(id, fd); + if (-1 == res) + throwFromErrnoWithPath("Cannot flush " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSYNC); +} + +off_t WriteBufferFromCubeFS::size() +{ + struct cfs_stat_info file_info; + int result = sdk_loader->cfsGetattr(id, const_cast(file_name.data()), &file_info); + if (result != 0) + { + // Handle the error (throw an exception, return an error code, etc.) + throwFromErrnoWithPath("Cannot execute fstat " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSTAT); + } + return file_info.size; +} +} diff --git a/src/IO/WriteBufferFromCubeFS.h b/src/IO/WriteBufferFromCubeFS.h new file mode 100644 index 000000000000..efbb283d250a --- /dev/null +++ b/src/IO/WriteBufferFromCubeFS.h @@ -0,0 +1,38 @@ +#pragma once +#include +#include + +namespace Poco +{ +class Logger; +} +namespace DB +{ +class WriteBufferFromCubeFS : public WriteBufferFromFileBase +{ +public: + explicit WriteBufferFromCubeFS( + int64_t id, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + int flags = -1, + mode_t mode = 0666, + char * existing_memory = nullptr, + size_t alignment = 0); + ~WriteBufferFromCubeFS() override; + void close(); + void sync() override; + std::string getFileName() const override { return file_name; } + off_t size(); + Poco::Logger * log = &Poco::Logger::get("WriteBufferFromCubeFS"); + +private: + void finalizeImpl() override; + void nextImpl() override; + int64_t id; + std::string file_name; + std::shared_ptr sdk_loader; + int fd; +}; +}