From 2d95546db0a9e4dbce13f38ee812eb84bb17d0d7 Mon Sep 17 00:00:00 2001 From: xieyichen Date: Thu, 2 Nov 2023 16:49:33 +0800 Subject: [PATCH] support cubefs sdk --- .gitmodules | 4 + contrib/CMakeLists.txt | 1 + contrib/cubefs | 1 + contrib/cubefs-cmake/CMakeLists.txt | 30 + programs/CMakeLists.txt | 25 + programs/config_tools.h.in | 1 + programs/cubefs-test/CMakeLists.txt | 4 + .../cubefs-test/clickhouse-cubefs-test.cpp | 5 + programs/cubefs-test/cubefs-dlopen.cpp | 165 +++++ programs/cubefs-test/cubefs-test.cpp | 103 +++ programs/main.cpp | 6 + src/CMakeLists.txt | 13 +- src/Disks/CubeFS/DiskCubeFS.cpp | 675 ++++++++++++++++++ src/Disks/CubeFS/DiskCubeFS.h | 110 +++ src/Disks/CubeFS/registerDiskCubeFS.cpp | 49 ++ src/Disks/CubeFS/sdkLoader.cpp | 48 ++ src/Disks/CubeFS/sdkLoader.h | 63 ++ src/Disks/DiskType.h | 3 + src/Disks/registerDisks.cpp | 3 +- src/Disks/tests/gtest_disk_cubefs.cpp | 256 +++++++ src/IO/ReadBufferFromCubeFS.cpp | 195 +++++ src/IO/ReadBufferFromCubeFS.h | 52 ++ src/IO/WriteBufferFromCubeFS.cpp | 122 ++++ src/IO/WriteBufferFromCubeFS.h | 38 + 24 files changed, 1970 insertions(+), 2 deletions(-) create mode 160000 contrib/cubefs create mode 100644 contrib/cubefs-cmake/CMakeLists.txt create mode 100644 programs/cubefs-test/CMakeLists.txt create mode 100644 programs/cubefs-test/clickhouse-cubefs-test.cpp create mode 100644 programs/cubefs-test/cubefs-dlopen.cpp create mode 100644 programs/cubefs-test/cubefs-test.cpp create mode 100644 src/Disks/CubeFS/DiskCubeFS.cpp create mode 100644 src/Disks/CubeFS/DiskCubeFS.h create mode 100644 src/Disks/CubeFS/registerDiskCubeFS.cpp create mode 100644 src/Disks/CubeFS/sdkLoader.cpp create mode 100644 src/Disks/CubeFS/sdkLoader.h create mode 100644 src/Disks/tests/gtest_disk_cubefs.cpp create mode 100644 src/IO/ReadBufferFromCubeFS.cpp create mode 100644 src/IO/ReadBufferFromCubeFS.h create mode 100644 src/IO/WriteBufferFromCubeFS.cpp create mode 100644 src/IO/WriteBufferFromCubeFS.h diff --git a/.gitmodules b/.gitmodules index 6c9e66f9cbc7..4ad531684245 100644 --- a/.gitmodules +++ b/.gitmodules @@ -262,3 +262,7 @@ [submodule "contrib/minizip-ng"] path = contrib/minizip-ng url = https://github.com/zlib-ng/minizip-ng +[submodule "contrib/cubefs"] + path = contrib/cubefs + url = https://github.com/RunningXie/cubefs.git + branch = fix_sdk_compilation diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 9cf307c473ee..e1e18fcd6e1b 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -135,6 +135,7 @@ add_contrib (libpq-cmake libpq) add_contrib (nuraft-cmake NuRaft) add_contrib (fast_float-cmake fast_float) add_contrib (datasketches-cpp-cmake datasketches-cpp) +add_contrib (cubefs-cmake cubefs) option(ENABLE_NLP "Enable NLP functions support" ${ENABLE_LIBRARIES}) if (ENABLE_NLP) diff --git a/contrib/cubefs b/contrib/cubefs new file mode 160000 index 000000000000..5eb497912217 --- /dev/null +++ b/contrib/cubefs @@ -0,0 +1 @@ +Subproject commit 5eb4979122170f02e2c2fc4e14f46f6092c4e08e diff --git a/contrib/cubefs-cmake/CMakeLists.txt b/contrib/cubefs-cmake/CMakeLists.txt new file mode 100644 index 000000000000..abec8b08c765 --- /dev/null +++ b/contrib/cubefs-cmake/CMakeLists.txt @@ -0,0 +1,30 @@ +cmake_minimum_required(VERSION 3.20) +message(STATUS "start handle contrib/cubefs-cmake/CMakeLists.txt") +set(CUBEFS_DIR "${ClickHouse_SOURCE_DIR}/contrib/cubefs") +message(STATUS "start execute_process") +execute_process( + COMMAND make libsdk + WORKING_DIRECTORY ${CUBEFS_DIR}) +add_library(_cubefs STATIC IMPORTED GLOBAL) +set_target_properties(_cubefs PROPERTIES + IMPORTED_LOCATION "${CUBEFS_DIR}/build/bin/libcfs.a" +) +add_library(ch_contrib::cubefs ALIAS _cubefs) +set(FILE_PATH "${CUBEFS_DIR}/libsdk/libcfs.h") +# Add configuration to ignore warnings in the header file +file(STRINGS ${FILE_PATH} FILE_CONTENTS) +list(FIND FILE_CONTENTS "#pragma GCC diagnostic push" PRAGMA_INDEX) +if (PRAGMA_INDEX EQUAL -1) + execute_process( + COMMAND sh -c "echo '#pragma GCC diagnostic push\n#pragma GCC diagnostic ignored \"-Wreserved-identifier\"\n#pragma GCC diagnostic ignored \"-Wmacro-redefined\"\n' | cat - ${FILE_PATH}" + OUTPUT_FILE "${FILE_PATH}.tmp" + ) + execute_process( + COMMAND sh -c "echo '#pragma GCC diagnostic pop' >> ${FILE_PATH}.tmp" + ) + execute_process( + COMMAND mv "${FILE_PATH}.tmp" "${FILE_PATH}" + ) +endif() + + diff --git a/programs/CMakeLists.txt b/programs/CMakeLists.txt index cca7be97b61b..629e0c0fc8a0 100644 --- a/programs/CMakeLists.txt +++ b/programs/CMakeLists.txt @@ -24,6 +24,9 @@ option (ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG "Configs processor (extract values # https://clickhouse.com/docs/en/operations/utilities/clickhouse-compressor/ option (ENABLE_CLICKHOUSE_COMPRESSOR "Data compressor and decompressor" ${ENABLE_CLICKHOUSE_ALL}) +# cubefs test +option (ENABLE_CLICKHOUSE_CUBEFS_TEST "Cubefs test" ${ENABLE_CLICKHOUSE_ALL}) + # https://clickhouse.com/docs/en/operations/utilities/clickhouse-copier/ option (ENABLE_CLICKHOUSE_COPIER "Inter-cluster data copying mode" ${ENABLE_CLICKHOUSE_ALL}) @@ -114,6 +117,12 @@ else() message(STATUS "Copier mode: OFF") endif() +if (ENABLE_CLICKHOUSE_CUBEFS_TEST) + message(STATUS "Cubefs test: ON") +else() + message(STATUS "Cubefs test: OFF") +endif() + if (ENABLE_CLICKHOUSE_FORMAT) message(STATUS "Format mode: ON") else() @@ -230,6 +239,7 @@ add_subdirectory (install) add_subdirectory (git-import) add_subdirectory (bash-completion) add_subdirectory (static-files-disk-uploader) +add_subdirectory (cubefs-test) if (ENABLE_CLICKHOUSE_KEEPER) add_subdirectory (keeper) @@ -261,6 +271,7 @@ if (CLICKHOUSE_ONE_SHARED) ${CLICKHOUSE_GIT_IMPORT_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES} ${CLICKHOUSE_KEEPER_SOURCES} + ${CLICKHOUSE_CUBEFS_TEST_SOURCES} ${CLICKHOUSE_KEEPER_CONVERTER_SOURCES} ${CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_SOURCES}) @@ -277,6 +288,7 @@ if (CLICKHOUSE_ONE_SHARED) ${CLICKHOUSE_GIT_IMPORT_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK} ${CLICKHOUSE_KEEPER_LINK} + ${CLICKHOUSE_CUBEFS_TEST_LINK} ${CLICKHOUSE_KEEPER_CONVERTER_LINK} ${CLICKHOUSE_STATIC_FILES_DISK_UPLOADER_LINK}) @@ -293,6 +305,7 @@ if (CLICKHOUSE_ONE_SHARED) ${CLICKHOUSE_GIT_IMPORT_INCLUDE} ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE} ${CLICKHOUSE_KEEPER_INCLUDE} + ${CLICKHOUSE_CUBEFS_TEST_INCLUDE} ${CLICKHOUSE_KEEPER_CONVERTER_INCLUDE}) set_target_properties(clickhouse-lib PROPERTIES SOVERSION ${VERSION_MAJOR}.${VERSION_MINOR} VERSION ${VERSION_SO} OUTPUT_NAME clickhouse DEBUG_POSTFIX "") @@ -330,6 +343,10 @@ if (CLICKHOUSE_SPLIT_BINARY) list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-keeper-converter) endif () + if (ENABLE_CLICKHOUSE_CUBEFS_TEST) + list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-cubefs-test) + endif () + set_target_properties(${CLICKHOUSE_ALL_TARGETS} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..) add_custom_target (clickhouse-bundle ALL DEPENDS ${CLICKHOUSE_ALL_TARGETS}) @@ -356,6 +373,9 @@ else () if (ENABLE_CLICKHOUSE_LOCAL) clickhouse_target_link_split_lib(clickhouse local) endif () + if (ENABLE_CLICKHOUSE_CUBEFS_TEST) + clickhouse_target_link_split_lib(clickhouse cubefs-test) + endif () if (ENABLE_CLICKHOUSE_BENCHMARK) clickhouse_target_link_split_lib(clickhouse benchmark) endif () @@ -416,6 +436,11 @@ else () install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-copier" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) list(APPEND CLICKHOUSE_BUNDLE clickhouse-copier) endif () + if (ENABLE_CLICKHOUSE_CUBEFS_TEST) + add_custom_target (clickhouse-cubefs-test ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-cubefs-test DEPENDS clickhouse) + install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-cubefs-test" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) + list(APPEND CLICKHOUSE_BUNDLE clickhouse-cubefs-test) + endif () if (ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG) add_custom_target (clickhouse-extract-from-config ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-extract-from-config DEPENDS clickhouse) install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-extract-from-config" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) diff --git a/programs/config_tools.h.in b/programs/config_tools.h.in index b97eb63b5352..30f890045696 100644 --- a/programs/config_tools.h.in +++ b/programs/config_tools.h.in @@ -19,3 +19,4 @@ #cmakedefine01 ENABLE_CLICKHOUSE_KEEPER #cmakedefine01 ENABLE_CLICKHOUSE_KEEPER_CONVERTER #cmakedefine01 ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER +#cmakedefine01 ENABLE_CLICKHOUSE_CUBEFS_TEST diff --git a/programs/cubefs-test/CMakeLists.txt b/programs/cubefs-test/CMakeLists.txt new file mode 100644 index 000000000000..54f1628419a6 --- /dev/null +++ b/programs/cubefs-test/CMakeLists.txt @@ -0,0 +1,4 @@ +set (CLICKHOUSE_CUBEFS_TEST_SOURCES cubefs-test.cpp clickhouse-cubefs-test.cpp) +set (CLICKHOUSE_CUBEFS_TEST_LINK dbms ch_contrib::cubefs) +set (CLICKHOUSE_CUBEFS_TEST_INCLUDE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) +clickhouse_program_add(cubefs-test) diff --git a/programs/cubefs-test/clickhouse-cubefs-test.cpp b/programs/cubefs-test/clickhouse-cubefs-test.cpp new file mode 100644 index 000000000000..384aea9916dc --- /dev/null +++ b/programs/cubefs-test/clickhouse-cubefs-test.cpp @@ -0,0 +1,5 @@ +int mainEntryClickHouseCubefsTest(int argc, char ** argv); +int main(int argc_, char ** argv_) +{ + return mainEntryClickHouseCubefsTest(argc_, argv_); +} diff --git a/programs/cubefs-test/cubefs-dlopen.cpp b/programs/cubefs-test/cubefs-dlopen.cpp new file mode 100644 index 000000000000..996106eff82b --- /dev/null +++ b/programs/cubefs-test/cubefs-dlopen.cpp @@ -0,0 +1,165 @@ +#include +#include +#include +#include + + +int main() +{ + void * handle = dlopen("./libcfs.so", RTLD_LAZY); + if (!handle) + { + std::cerr << "Failed to open the dynamic library: " << dlerror() << std::endl; + return 1; + } + typedef int64_t (*CfsNewClientFunction)(); + typedef int (*CfsSetClientFunction)(int64_t, char *, char *); + typedef int (*CfsStartClientFunction)(int64_t); + typedef void (*CfsCloseClientFunction)(int64_t); + typedef int (*CfsChdirFunction)(int64_t, char *); + typedef char * (*CfsGetcwdFunction)(int64_t); + typedef int (*CfsGetattrFunction)(int64_t, char *, struct cfs_stat_info *); + typedef int (*CfsSetattrFunction)(int64_t, char *, struct cfs_stat_info *, int); + typedef int (*CfsOpenFunction)(int64_t, char *, int, mode_t); + typedef int (*CfsFlushFunction)(int64_t, int); + typedef void (*CfsCloseFunction)(int64_t, int); + typedef ssize_t (*CfsWriteFunction)(int64_t, int, void *, size_t, off_t); + typedef ssize_t (*CfsReadFunction)(int64_t, int, void *, size_t, off_t); + typedef int (*CfsBatchGetInodesFunction)(int64_t, int, void *, GoSlice, int); + typedef int (*CfsRefreshSummaryFunction)(int64_t, char *, int); + typedef int (*CfsReaddirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsLsdirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsMkdirsFunction)(int64_t, char *, mode_t); + typedef int (*CfsRmdirFunction)(int64_t, char *); + typedef int (*CfsUnlinkFunction)(int64_t, char *); + typedef int (*CfsRenameFunction)(int64_t, char *, char *); + typedef int (*CfsFchmodFunction)(int64_t, int, mode_t); + typedef int (*CfsGetsummaryFunction)(int64_t, char *, struct cfs_summary_info *, char *, int); + CfsNewClientFunction cfsNewClient = (CfsNewClientFunction)dlsym(handle, "cfs_new_client"); + CfsSetClientFunction cfsSetClient = (CfsSetClientFunction)dlsym(handle, "cfs_set_client"); + CfsStartClientFunction cfsStartClient = (CfsStartClientFunction)dlsym(handle, "cfs_start_client"); + CfsCloseClientFunction cfsCloseClient = (CfsCloseClientFunction)dlsym(handle, "cfs_close_client"); + CfsChdirFunction cfsChdir = (CfsChdirFunction)dlsym(handle, "cfs_chdir"); + CfsGetcwdFunction cfsGetcwd = (CfsGetcwdFunction)dlsym(handle, "cfs_getcwd"); + CfsGetattrFunction cfsGetattr = (CfsGetattrFunction)dlsym(handle, "cfs_getattr"); + CfsSetattrFunction cfsSetattr = (CfsSetattrFunction)dlsym(handle, "cfs_setattr"); + CfsOpenFunction cfsOpen = (CfsOpenFunction)dlsym(handle, "cfs_open"); + CfsFlushFunction cfsFlush = (CfsFlushFunction)dlsym(handle, "cfs_flush"); + CfsCloseFunction cfsClose = (CfsCloseFunction)dlsym(handle, "cfs_close"); + CfsLsdirFunction cfsLsdir = (CfsLsdirFunction)dlsym(handle, "cfs_lsdir"); + CfsMkdirsFunction cfsMkdirs = (CfsMkdirsFunction)dlsym(handle, "cfs_mkdirs"); + CfsRmdirFunction cfsRmdir = (CfsRmdirFunction)dlsym(handle, "cfs_rmdir"); + CfsUnlinkFunction cfsUnlink = (CfsUnlinkFunction)dlsym(handle, "cfs_unlink"); + CfsRenameFunction cfsRename = (CfsRenameFunction)dlsym(handle, "cfs_rename"); + CfsFchmodFunction cfsFchmod = (CfsFchmodFunction)dlsym(handle, "cfs_fchmod"); + CfsGetsummaryFunction cfsGetsummary = (CfsGetsummaryFunction)dlsym(handle, "cfs_getsummary"); + CfsWriteFunction cfsWrite = (CfsWriteFunction)dlsym(handle, "cfs_write"); + CfsReadFunction cfsRead = (CfsReadFunction)dlsym(handle, "cfs_read"); + if (!cfsNewClient || !cfsSetClient || !cfsStartClient || !cfsCloseClient || !cfsChdir || !cfsGetcwd || !cfsGetattr || !cfsSetattr + || !cfsOpen || !cfsFlush || !cfsClose || !cfsWrite || !cfsRead || !cfsLsdir || !cfsMkdirs || !cfsRmdir || !cfsUnlink || !cfsRename + || !cfsFchmod || !cfsGetsummary) + { + std::cerr << "Failed to find one or more functions: " << dlerror() << std::endl; + dlclose(handle); + return 1; + } + int64_t clientId = cfsNewClient(); + if (clientId == 0) + { + std::cerr << "Failed to create client" << std::endl; + dlclose(handle); + return 1; + } + int result = cfsSetClient(clientId, strdup("volName"), strdup("xieyichen")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("masterAddr"), strdup("cfs-south.oppo.local")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("logDir"), strdup("/home/service/var/logs/cfs/test-log")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("logLevel"), strdup("debug")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("accessKey"), strdup("jRlZO65q7XlH5bnV")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("secretKey"), strdup("V1m730UzREHaK1jCkC0kL0cewOX0kH3K")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsSetClient(clientId, strdup("pushAddr"), strdup("cfs.dg-push.wanyol.com")); + if (result != 0) + { + std::cerr << "Failed to set client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + result = cfsStartClient(clientId); + if (result != 0) + { + std::cerr << "Failed to start client" << std::endl; + cfsCloseClient(clientId); + dlclose(handle); + return 1; + } + int fd = cfsOpen(clientId, strdup("/test_dir/file.txt"), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (fd < 0) + { + printf("Failed to open file\n"); + cfsCloseClient(clientId); + return -1; + } + const char * data = "xieyichen"; + if (cfsWrite(clientId, fd, static_cast(const_cast(data)), strlen(data), 0) < 0) + { + printf("Failed to write file\n"); + cfsClose(clientId, fd); + cfsCloseClient(clientId); + return -1; + } + char buffer[1024]; + memset(buffer, 0, sizeof(buffer)); + if (cfsRead(clientId, fd, static_cast(const_cast(buffer)), sizeof(buffer), 0) < 0) + { + printf("Failed to read file\n"); + cfsClose(clientId, fd); + cfsCloseClient(clientId); + return -1; + } + printf("Read file content: %s\n", buffer); + cfsCloseClient(clientId); + dlclose(handle); + return 0; +} diff --git a/programs/cubefs-test/cubefs-test.cpp b/programs/cubefs-test/cubefs-test.cpp new file mode 100644 index 000000000000..a1c54ed1baa2 --- /dev/null +++ b/programs/cubefs-test/cubefs-test.cpp @@ -0,0 +1,103 @@ +#include +#include +#include +#include +#include +#include + +void readWriteTest() +{ + int64_t id = cfs_new_client(); + if (id <= 0) + { + printf("Failed to create client\n"); + return; + } + if (cfs_set_client(id, strdup("volName"), strdup("xieyichen")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("masterAddr"), strdup("cfs-south.oppo.local")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("logDir"), strdup("/home/service/var/logs/cfs/test-log")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("logLevel"), strdup("debug")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("accessKey"), strdup("jRlZO65q7XlH5bnV")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("secretKey"), strdup("V1m730UzREHaK1jCkC0kL0cewOX0kH3K")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_set_client(id, strdup("pushAddr"), strdup("cfs.dg-push.wanyol.com")) != 0) + { + printf("Failed to set client info\n"); + cfs_close_client(id); + return; + } + if (cfs_start_client(id) != 0) + { + printf("Failed to start client \n"); + cfs_close_client(id); + return; + } + int fd = cfs_open(id, strdup("/test_dir/file.txt"), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (fd < 0) + { + printf("Failed to open file\n"); + cfs_close_client(id); + return; + } + + const char * data = "Hello, world!"; + if (cfs_write(id, fd, static_cast(const_cast(data)), strlen(data), 0) < 0) + { + printf("Failed to write file\n"); + cfs_close(id, fd); + cfs_close_client(id); + return; + } + + char buffer[1024]; + memset(buffer, 0, sizeof(buffer)); + if (cfs_read(id, fd, static_cast(const_cast(buffer)), sizeof(buffer), 0) < 0) + { + printf("Failed to read file\n"); + cfs_close(id, fd); + cfs_close_client(id); + return; + } + + printf("Read file content: %s\n", buffer); + cfs_close(id, fd); + cfs_close_client(id); +} + +int mainEntryClickHouseCubefsTest(int argc, char ** argv) +{ + std::thread worker(readWriteTest); + worker.join(); + std::cout << "argc = " << argc << std::endl; + std::cout << "argv = " << argv << std::endl; + return 0; +} diff --git a/programs/main.cpp b/programs/main.cpp index 2cdda075ca7d..f8573e13ce78 100644 --- a/programs/main.cpp +++ b/programs/main.cpp @@ -63,6 +63,9 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv); #if ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER int mainEntryClickHouseStaticFilesDiskUploader(int argc, char ** argv); #endif +#if ENABLE_CLICKHOUSE_CUBEFS_TEST +int mainEntryClickHouseCubefsTest(int argc, char ** argv); +#endif #if ENABLE_CLICKHOUSE_INSTALL int mainEntryClickHouseInstall(int argc, char ** argv); int mainEntryClickHouseStart(int argc, char ** argv); @@ -127,6 +130,9 @@ std::pair clickhouse_applications[] = #if ENABLE_CLICKHOUSE_KEEPER_CONVERTER {"keeper-converter", mainEntryClickHouseKeeperConverter}, #endif +#if ENABLE_CLICKHOUSE_CUBEFS_TEST + {"cubefs-test", mainEntryClickHouseCubefsTest}, +#endif #if ENABLE_CLICKHOUSE_INSTALL {"install", mainEntryClickHouseInstall}, {"start", mainEntryClickHouseStart}, diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b24181625d3f..22171706fb8d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -75,12 +75,13 @@ add_subdirectory (Coordination) set(dbms_headers) set(dbms_sources) - +message("ClickHouse_SOURCE_DIR: ${ClickHouse_SOURCE_DIR}") add_headers_and_sources(clickhouse_common_io Common) add_headers_and_sources(clickhouse_common_io Common/HashTable) add_headers_and_sources(clickhouse_common_io IO) add_headers_and_sources(clickhouse_common_io IO/Archives) add_headers_and_sources(clickhouse_common_io IO/S3) +add_headers_and_sources(clickhouse_common_io ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp) add_headers_and_sources(dbms Disks/IO) @@ -111,6 +112,11 @@ if (TARGET ch_contrib::aws_s3) add_headers_and_sources(dbms Disks/S3) endif() +if (TARGET ch_contrib::cubefs) + add_headers_and_sources(dbms Disks/CubeFS) + add_headers_and_sources(dbms ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) +endif() + if (TARGET ch_contrib::azure_sdk) add_headers_and_sources(dbms Disks/AzureBlobStorage) endif() @@ -304,6 +310,7 @@ endmacro () dbms_target_include_directories (PUBLIC "${ClickHouse_SOURCE_DIR}/src" "${ClickHouse_BINARY_DIR}/src") target_include_directories (clickhouse_common_io PUBLIC "${ClickHouse_SOURCE_DIR}/src" "${ClickHouse_BINARY_DIR}/src") +target_include_directories(clickhouse_common_io PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/cubefs/libsdk) if (TARGET ch_contrib::llvm) dbms_target_link_libraries (PUBLIC ch_contrib::llvm) @@ -464,6 +471,10 @@ if (TARGET ch_contrib::aws_s3) target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::aws_s3) endif() +if (TARGET ch_contrib::cubefs) + target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::cubefs) +endif() + if (TARGET ch_contrib::azure_sdk) target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::azure_sdk) endif() diff --git a/src/Disks/CubeFS/DiskCubeFS.cpp b/src/Disks/CubeFS/DiskCubeFS.cpp new file mode 100644 index 000000000000..8eaec9f10236 --- /dev/null +++ b/src/Disks/CubeFS/DiskCubeFS.cpp @@ -0,0 +1,675 @@ +#include "DiskCubeFS.h" +#include +#include +#include +#include +#include +namespace CurrentMetrics +{ +extern const Metric DiskSpaceReservedForMerge; +} + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_ELEMENT_IN_CONFIG; + extern const int EXCESSIVE_ELEMENT_IN_CONFIG; + extern const int PATH_ACCESS_DENIED; + extern const int LOGICAL_ERROR; + extern const int CANNOT_TRUNCATE_FILE; + extern const int CANNOT_UNLINK; + extern const int CANNOT_RMDIR; + extern const int BAD_ARGUMENTS; + extern const int CANNOT_CREATE_DIRECTORY; + extern const int CANNOT_STATVFS; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; + extern const int CANNOT_CLOSE_FILE; + extern const int ATOMIC_RENAME_FAIL; + extern const int FILE_ALREADY_EXISTS; + extern const int DIRECTORY_DOESNT_EXIST; + extern const int FILE_DOESNT_EXIST; +} + +const int default_readdir_count = 10000; +std::mutex DiskCubeFS::reservation_mutex; +using DiskCubeFSPtr = std::shared_ptr; + +class DiskCubeFSReservation : public IReservation +{ +public: + DiskCubeFSReservation(const DiskCubeFSPtr & disk_, UInt64 size_) + : disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_) + { + } + + UInt64 getSize() const override { return size; } + + DiskPtr getDisk(size_t i) const override + { + if (i != 0) + throw Exception("Can't use i != 0 with single disk reservation. It's a bug", ErrorCodes::LOGICAL_ERROR); + return disk; + } + + Disks getDisks() const override { return {disk}; } + + void update(UInt64 new_size) override + { + std::lock_guard lock(DiskCubeFS::reservation_mutex); + disk->reserved_bytes -= size; + size = new_size; + disk->reserved_bytes += size; + } + + ~DiskCubeFSReservation() override + { + try + { + std::lock_guard lock(DiskCubeFS::reservation_mutex); + if (disk->reserved_bytes < size) + { + disk->reserved_bytes = 0; + LOG_ERROR(&Poco::Logger::get("DiskCubeFS"), "Unbalanced reservations size for disk '{}'.", disk->getName()); + } + else + { + disk->reserved_bytes -= size; + } + + if (disk->reservation_count == 0) + LOG_ERROR(&Poco::Logger::get("DiskCubeFS"), "Unbalanced reservation count for disk '{}'.", disk->getName()); + else + --disk->reservation_count; + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + +private: + DiskCubeFSPtr disk; + UInt64 size; + CurrentMetrics::Increment metric_increment; +}; + + +ReservationPtr DiskCubeFS::reserve(UInt64 bytes) +{ + LOG_DEBUG(logger, "Got reserve request, bytes: {}", bytes); + if (!tryReserve(bytes)) + return {}; + return std::make_unique(std::static_pointer_cast(shared_from_this()), bytes); +} + +bool DiskCubeFS::tryReserve(UInt64 bytes) +{ + std::lock_guard lock(DiskCubeFS::reservation_mutex); + if (bytes == 0) + { + LOG_DEBUG(logger, "Reserving 0 bytes on disk {}", backQuote(name)); + ++reservation_count; + return true; + } + + auto available_space = getTotalSpace(); + UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes); + if (unreserved_space >= bytes) + { + LOG_DEBUG( + logger, "Reserving {} on disk {}, having unreserved {}.", ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space)); + ++reservation_count; + reserved_bytes += bytes; + return true; + } + LOG_DEBUG(logger, "DiskCubeFS::tryReserve return false"); + return false; +} + +UInt64 DiskCubeFS::getTotalSpace() const +{ + LOG_DEBUG(logger, "Use mock getTotalSpace function"); + return 419585155072; + //throwFromErrnoWithPath("DiskCubeFs does not support function getTotalSpace!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +UInt64 DiskCubeFS::getAvailableSpace() const +{ + LOG_DEBUG(logger, "Use mock getAvailableSpace function"); + return 151697231872; + //throwFromErrnoWithPath("DiskCubeFs does not support function getAvailableSpace!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +UInt64 DiskCubeFS::getUnreservedSpace() const +{ + LOG_DEBUG(logger, "Use mock getUnreservedSpace function"); + return 151697231872; + //throwFromErrnoWithPath("DiskCubeFs does not support function getUnreservedSpace!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +bool DiskCubeFS::isFile(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + if (S_ISREG(stat.mode)) + return true; + + return false; +} + +bool DiskCubeFS::isDirectory(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + if (S_ISDIR(stat.mode)) + return true; + + return false; +} + +size_t DiskCubeFS::getFileSize(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + return stat.size; +} + +cfs_stat_info DiskCubeFS::getFileAttributes(const String & relative_path) const +{ + cfs_stat_info stat; + fs::path full_path = fs::path(settings->disk_path) / relative_path; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.c_str()), &stat); + if (result != 0) + { + int error_code = errno; + std::string error_message = strerror(error_code); + std::string full_error_message = "Failed to get file attribute: " + full_path.string() + ": " + error_message; + throwFromErrnoWithPath(full_error_message, full_path, ErrorCodes::CANNOT_STATVFS); + } + return stat; +} + +fs::path DiskCubeFS::getParentPath(const String & path) +{ + fs::path filePath(path); + fs::path parentPath = filePath.parent_path(); + + if (path.back() == '/' && parentPath != filePath) + { + parentPath = parentPath.parent_path(); + } + return parentPath; +} + +void DiskCubeFS::createDirectory(const String & path) +{ + fs::path parent_dir = getParentPath(path); + if (!exists(parent_dir)) + { + throwFromErrnoWithPath( + "Parent directory does not exist [" + parent_dir.string() + "]", parent_dir, ErrorCodes::DIRECTORY_DOESNT_EXIST); + } + fs::path full_path = fs::path(settings->disk_path) / path; + createDirectories(path); +} + +void DiskCubeFS::createDirectories(const String & path) +{ + fs::path full_path = fs::path(settings->disk_path) / path; + int result = sdk_loader->cfsMkdirs(id, const_cast(full_path.string().c_str()), O_RDWR | O_CREAT); + if (result != 0) + { + throwFromErrnoWithPath("Cannot mkdir: " + full_path.string(), full_path, ErrorCodes::CANNOT_CREATE_DIRECTORY); + } +} + +void DiskCubeFS::clearDirectory(const String & path) +{ + std::vector file_names; + listFiles(path, file_names); + for (const auto & filename : file_names) + { + fs::path file_path = fs::path(settings->disk_path) / filename; + + int result = sdk_loader->cfsUnlink(id, const_cast(file_path.string().c_str())); + if (result < 0) + { + throwFromErrnoWithPath("Cannot unlink file: " + file_path.string(), file_path, ErrorCodes::CANNOT_UNLINK); + } + } +} + +void DiskCubeFS::listFiles(const String & path, std::vector & file_names) +{ + file_names.clear(); + fs::path full_path = fs::path(settings->disk_path) / path; + int fd = -1; + fd = sdk_loader->cfsOpen(id, const_cast(full_path.string().c_str()), 0, 0); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + full_path.string(), full_path, ErrorCodes::CANNOT_OPEN_FILE); + } + int count = default_readdir_count; + while (true) + { + std::vector direntsInfo(count); + std::memset(direntsInfo.data(), 0, count * sizeof(cfs_dirent_info)); + int num_entries = sdk_loader->cfsReaddir(id, fd, {direntsInfo.data(), count, count}, count); + if (num_entries < 0) + { + throwFromErrnoWithPath("Cannot readdir: " + full_path.string(), full_path, ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + if (num_entries < count) + { + for (int i = 0; i < num_entries; i++) + { + std::string file_name(direntsInfo[i].name); + file_names.emplace_back(file_name); + } + sdk_loader->cfsClose(id, fd); + return; + } + count = count * 2; + } +} + + +void DiskCubeFS::removeFile(const String & path) +{ + fs::path full_path = (fs::path(settings->disk_path) / path); + if (!fileExists(path)) + { + throwFromErrnoWithPath("File does not exists: " + full_path.string(), full_path, ErrorCodes::FILE_DOESNT_EXIST); + } + int result = sdk_loader->cfsUnlink(id, const_cast(full_path.c_str())); + if (result != 0) + { + throwFromErrnoWithPath("Cannot unlink file: " + full_path.string(), full_path, ErrorCodes::CANNOT_UNLINK); + } +} + +void DiskCubeFS::moveDirectory(const String & from_path, const String & to_path) +{ + moveFile(from_path, to_path); +} + +class DiskCubeFSDirectoryIterator final : public IDiskDirectoryIterator +{ +public: + DiskCubeFSDirectoryIterator() : id(-1), fd(-1), current_index(0) { } + DiskCubeFSDirectoryIterator(int64_t client_id, const String & path, std::shared_ptr sdk_loader_) + : id(client_id), dir_path(path), fd(-1), current_index(0), sdk_loader(sdk_loader_) + { + openDirectory(); + } + ~DiskCubeFSDirectoryIterator() override { closeDirectory(); } + void next() override { ++current_index; } + bool isValid() const override { return current_index < dirents.size(); } + String path() const override + { + if (isValid()) + return (fs::path(dir_path) / dirents[current_index]).string(); + else + return ""; + } + String name() const override + { + if (isValid()) + return dirents[current_index]; + else + return ""; + } + +private: + int64_t id; + String dir_path; + int fd; + std::vector dirents; + size_t current_index; + std::shared_ptr sdk_loader; + void openDirectory() + { + fd = sdk_loader->cfsOpen(id, const_cast(dir_path.c_str()), O_RDONLY | O_DIRECTORY, 0755); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + dir_path, fs::path(dir_path), ErrorCodes::CANNOT_OPEN_FILE); + } + int count = default_readdir_count; + while (true) + { + std::vector direntsInfo(count); + std::memset(direntsInfo.data(), 0, count * sizeof(cfs_dirent_info)); + int num_entries = sdk_loader->cfsReaddir(id, fd, {direntsInfo.data(), count, count}, count); + if (num_entries < 0) + { + throwFromErrnoWithPath("Cannot readdir: " + dir_path, fs::path(dir_path), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + if (num_entries < count) + { + for (int i = 0; i < num_entries; i++) + { + std::string file_name(direntsInfo[i].name); + dirents.emplace_back(file_name); + } + return; + } + count = count * 2; + } + } + void closeDirectory() + { + if (fd > 0) + sdk_loader->cfsClose(id, fd); + } +}; + +DiskDirectoryIteratorPtr DiskCubeFS::iterateDirectory(const String & path) +{ + fs::path meta_path = fs::path(settings->disk_path) / path; + if (!broken && directoryExists(meta_path)) + return std::make_unique(id, meta_path.string(), sdk_loader); + else + return std::make_unique(); +} + +void DiskCubeFS::createFile(const String & path) +{ + fs::path parent_dir = getParentPath(path); + if (!exists(parent_dir)) + { + throwFromErrnoWithPath( + "Parent directory does not exist [" + parent_dir.string() + "]", parent_dir, ErrorCodes::DIRECTORY_DOESNT_EXIST); + } + fs::path full_path = fs::path(settings->disk_path) / path; + int fd = sdk_loader->cfsOpen( + id, + const_cast(full_path.string().c_str()), + O_WRONLY | O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + full_path.string(), full_path, ErrorCodes::CANNOT_OPEN_FILE); + } + sdk_loader->cfsClose(id, fd); +} + +void DiskCubeFS::replaceFile(const String & from_path, const String & to_path) +{ + fs::path parent_dir = getParentPath(to_path); + if (!exists(parent_dir)) + { + throwFromErrnoWithPath( + "Destination directory does not exist [" + parent_dir.string() + "]", parent_dir, ErrorCodes::DIRECTORY_DOESNT_EXIST); + } + fs::path full_from_path = fs::path(settings->disk_path) / from_path; + fs::path full_to_path = fs::path(settings->disk_path) / to_path; + int result + = sdk_loader->cfsRename(id, const_cast(full_from_path.string().c_str()), const_cast(full_to_path.string().c_str())); + if (result != 0) + { + throwFromErrnoWithPath( + "Failed to rename file, from path: " + full_from_path.string() + " to path: " + full_to_path.string(), + "", + ErrorCodes::ATOMIC_RENAME_FAIL); + } +} + +void DiskCubeFS::moveFile(const String & from_path, const String & to_path) +{ + if (exists(to_path)) + { + throwFromErrnoWithPath( + "Destination file already exists [" + (fs::path(settings->disk_path) / to_path).string() + "]", + fs::path(settings->disk_path) / to_path, + ErrorCodes::FILE_ALREADY_EXISTS); + } + replaceFile(from_path, to_path); +} + +void DiskCubeFS::removeFileIfExists(const String & path) +{ + if (fileExists(path)) + { + auto fs_path = fs::path(settings->disk_path) / path; + if (0 != sdk_loader->cfsUnlink(id, const_cast(fs_path.c_str()))) + throwFromErrnoWithPath("Cannot unlink file " + fs_path.string(), fs_path, ErrorCodes::CANNOT_UNLINK); + } +} + +void DiskCubeFS::removeDirectory(const String & path) +{ + auto fs_path = fs::path(settings->disk_path) / path; + if (0 != sdk_loader->cfsRmdir(id, const_cast(fs_path.c_str()))) + { + int error_code = errno; + std::string error_message = strerror(error_code); + std::string full_error_message = "Cannot rmdir " + fs_path.string() + ": " + error_message; + throwFromErrnoWithPath(full_error_message, fs_path, ErrorCodes::CANNOT_RMDIR); + } +} + +void DiskCubeFS::removeRecursive(const String & path) +{ + cfs_stat_info stat = getFileAttributes(path); + if (S_ISDIR(stat.mode)) + { + std::vector file_names; + listFiles(path, file_names); + for (const auto & filename : file_names) + { + auto child_path = fs::path(path) / filename; + removeRecursive(child_path.string()); + } + removeDirectory(path); + } + else + { + removeFile(path); + } +} + +//warning: The file stat information is cached locally and may not be updated immediately +void DiskCubeFS::setLastModified(const String & path, const Poco::Timestamp & timestamp) +{ + constexpr uint32_t AttrModifyTime = 1 << 3; + constexpr uint32_t AttrAccessTime = 1 << 4; + cfs_stat_info stat = getFileAttributes(path); + stat.atime = timestamp.epochTime(); + stat.mtime = timestamp.epochTime(); + fs::path full_path = fs::path(settings->disk_path) / path; + if (sdk_loader->cfsSetattr(id, const_cast(full_path.string().c_str()), &stat, AttrModifyTime | AttrAccessTime) != 0) + { + throwFromErrnoWithPath("Cannot setattr " + full_path.string(), full_path, ErrorCodes::LOGICAL_ERROR); + } +} + +Poco::Timestamp DiskCubeFS::getLastModified(const String & path) +{ + return Poco::Timestamp::fromEpochTime(getLastChanged(path)); +} + +time_t DiskCubeFS::getLastChanged(const String & path) const +{ + cfs_stat_info stat = getFileAttributes(path); + return stat.ctime; +} + +void DiskCubeFS::setReadOnly(const String & path) +{ + cfs_stat_info stat = getFileAttributes(path); + fs::path full_path = fs::path(settings->disk_path) / path; + int fd = sdk_loader->cfsOpen(id, const_cast(full_path.string().c_str()), 0, 0); + if (fd < 0) + { + throwFromErrnoWithPath("Cannot open: " + full_path.string(), full_path, ErrorCodes::CANNOT_OPEN_FILE); + } + mode_t newMode = stat.mode & ~(S_IWUSR | S_IWGRP | S_IWOTH); + if (sdk_loader->cfsFchmod(id, fd, newMode) != 0) + { + throwFromErrnoWithPath("Cannot fchmod " + full_path.string(), full_path, ErrorCodes::LOGICAL_ERROR); + } +} + +void DiskCubeFS::createHardLink(const String &, const String &) +{ + throwFromErrnoWithPath("DiskCubeFs does not support function createHardLink!", settings->disk_path, ErrorCodes::LOGICAL_ERROR); +} + +DiskCubeFS::DiskCubeFS(const String & name_, SettingsPtr settings_) + : name(name_), settings(std::move(settings_)), logger(&Poco::Logger::get("DiskCubeFS")) +{ + sdk_loader = std::make_shared(settings->lib_path); + id = sdk_loader->cfsNewClient(); + if (id <= 0) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to create client"); + } + setClientInfo("volName", const_cast(settings->vol_name.data())); + setClientInfo("masterAddr", const_cast(settings->master_addr.data())); + setClientInfo("logDir", const_cast(settings->log_dir.data())); + setClientInfo("logLevel", const_cast(settings->log_level.data())); + setClientInfo("accessKey", const_cast(settings->access_key.data())); + setClientInfo("secretKey", const_cast(settings->secret_key.data())); + setClientInfo("pushAddr", const_cast(settings->push_addr.data())); + if (sdk_loader->cfsStartClient(id) != 0) + { + throwFromErrnoWithPath("Start cfs client failed", "", ErrorCodes::LOGICAL_ERROR); + } + createDirectories(""); +} + +DiskCubeFS::DiskCubeFS(const String & name_, ContextPtr, SettingsPtr settings_) : DiskCubeFS(name_, settings_) +{ +} + +void DiskCubeFS::getClientId() +{ + id = sdk_loader->cfsNewClient(); + if (id <= 0) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to create client"); + } +} + +void DiskCubeFS::setClientInfo(const char * key, char * value) +{ + if (sdk_loader->cfsSetClient(id, strdup(key), value) != 0) + { + sdk_loader->cfsCloseClient(id); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set client info"); + } +} + +DiskCubeFSSettings ::DiskCubeFSSettings( + String vol_name_, + String master_addr_, + String log_dir_, + String log_level_, + String access_key_, + String secret_key_, + String push_addr_, + String disk_path_, + String lib_path_) + : vol_name(vol_name_) + , master_addr(master_addr_) + , log_dir(log_dir_) + , log_level(log_level_) + , access_key(access_key_) + , secret_key(secret_key_) + , push_addr(push_addr_) + , disk_path(disk_path_) + , lib_path(lib_path_) +{ +} + +bool DiskCubeFS::canRead(const std::string & path) +{ + cfs_stat_info stat = getFileAttributes(path); + if (stat.uid == geteuid()) + return (stat.mode & S_IRUSR) != 0; + else if (stat.gid == getegid()) + return (stat.mode & S_IRGRP) != 0; + else + return (stat.mode & S_IROTH) != 0 || geteuid() == 0; +} + +bool DiskCubeFS::exists(const String & path) const +{ + fs::path full_path = fs::path(settings->disk_path) / path; + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.string().c_str()), &stat); + return (result == 0); +} + +bool DiskCubeFS::fileExists(const String & path) const +{ + fs::path full_path = fs::path(settings->disk_path) / path; + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.string().c_str()), &stat); + return (result == 0) && (S_ISREG(stat.mode)); +} + +bool DiskCubeFS::directoryExists(const String & path) const +{ + fs::path full_path = fs::path(settings->disk_path) / path; + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(full_path.string().c_str()), &stat); + return (result == 0) && (S_ISDIR(stat.mode)); +} + +std::optional DiskCubeFS::fileSizeSafe(const fs::path & path) +{ + cfs_stat_info stat; + int result = sdk_loader->cfsGetattr(id, const_cast(path.c_str()), &stat); + if (result == 0) + { + return stat.size; + } + else if (result == -ENOENT || result == -ENOTDIR) + { + return std::nullopt; + } + else + { + throw std::runtime_error("Failed to get file size: " + path.string()); + } +} + +std::unique_ptr +DiskCubeFS::readFile(const String & path, const ReadSettings &, std::optional, std::optional) const +{ + if (!fileExists(path)) + { + fs::path full_path = fs::path(settings->disk_path) / path; + throwFromErrnoWithPath("File does not exists: " + full_path.string(), full_path, ErrorCodes::FILE_DOESNT_EXIST); + } + return std::make_unique(id, fs::path(settings->disk_path) / path, sdk_loader, O_RDONLY | O_CLOEXEC); +} + +// std::optional DiskCubeFS::readDiskCheckerMagicNumber() const noexcept +// { +// try +// { +// auto buf = readFile(disk_checker_path); +// UInt32 magic_number; +// readIntBinary(magic_number, *buf); +// if (buf->eof()) +// return magic_number; +// LOG_WARNING(logger, "The size of disk check magic number is more than 4 bytes. Mark it as read failure"); +// return {}; +// } +// catch (...) +// { +// tryLogCurrentException( +// logger, fmt::format("Cannot read correct disk check magic number from from {}{}", settings->disk_path, disk_checker_path)); +// return {}; +// } +// } + +std::unique_ptr DiskCubeFS::writeFile(const String & path, size_t buf_size, WriteMode mode) +{ + int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1; + return std::make_unique(id, fs::path(settings->disk_path) / path, sdk_loader, buf_size, flags); +} +} diff --git a/src/Disks/CubeFS/DiskCubeFS.h b/src/Disks/CubeFS/DiskCubeFS.h new file mode 100644 index 000000000000..bcdc239b7a11 --- /dev/null +++ b/src/Disks/CubeFS/DiskCubeFS.h @@ -0,0 +1,110 @@ +#pragma once + +#include +#include "sdkLoader.h" + +namespace DB +{ +struct DiskCubeFSSettings +{ + DiskCubeFSSettings( + String vol_name_, + String master_addr_, + String log_dir_, + String log_level_, + String access_key_, + String secret_key_, + String push_addr_, + String disk_path_, + String lib_path_); + String vol_name; + String master_addr; + String log_dir; + String log_level; + String access_key; + String secret_key; + String push_addr; + String disk_path; + String lib_path; +}; + +class DiskCubeFS final : public IDisk +{ +public: + friend class DiskCubeFSReservation; + using SettingsPtr = std::shared_ptr; + + DiskCubeFS(const String & name_, SettingsPtr settings_); + DiskCubeFS(const String & name_, ContextPtr context, SettingsPtr settings_); + std::unique_ptr + readFile(const String & path, const ReadSettings & settings, std::optional read_hint, std::optional file_size) + const override; + std::unique_ptr writeFile(const String & path, size_t buf_size, WriteMode mode) override; + bool exists(const String & path) const override; + const String & getName() const override { return name; } + const String & getPath() const override { return settings->disk_path; } + ReservationPtr reserve(UInt64 bytes) override; + UInt64 getTotalSpace() const override; + UInt64 getUnreservedSpace() const override; + bool isFile(const String & path) const override; + bool isDirectory(const String & path) const override; + size_t getFileSize(const String & path) const override; + //differs from createDirectories, an error will be reported if the parent directory does not exist. + void createDirectory(const String & path) override; + void createDirectories(const String & path) override; + /* + Delete all files under the folder + Empty subfolders can be deleted, but if there are files in the subfolders, an error will be reported.. + */ + void clearDirectory(const String & path) override; + /* + Follow fs::rename semantics. If the upper-level directory on the destination does not exist, an error will be reported. + By modifying the cubefs code, the cfs_rename method supports overwriting the destination file. + */ + void moveDirectory(const String & from_path, const String & to_path) override; + //differs from replaceFile, will not overwrite the destination file + void moveFile(const String & from_path, const String & to_path) override; + void replaceFile(const String & from_path, const String & to_path) override; + DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + //If the parent directory does not exist, an error will be reported. + void createFile(const String & path) override; + void listFiles(const String & path, std::vector & file_names) override; + void removeFile(const String & path) override; + void removeFileIfExists(const String & path) override; + void removeDirectory(const String & path) override; + void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; + Poco::Timestamp getLastModified(const String & path) override; + time_t getLastChanged(const String & path) const override; + void setReadOnly(const String & path) override; + void createHardLink(const String & src_path, const String & dst_path) override; + DiskType getType() const override { return DiskType::CubeFS; } + bool isRemote() const override { return true; } + bool supportZeroCopyReplication() const override { return false; } + void removeRecursive(const String & path) override; + UInt64 getAvailableSpace() const override; + +private: + bool fileExists(const String & path) const; + bool directoryExists(const String & path) const; + cfs_stat_info getFileAttributes(const String & relative_path) const; + bool canRead(const std::string & path); + std::optional fileSizeSafe(const fs::path & path); + bool tryReserve(UInt64 bytes); + void getClientId(); + void setClientInfo(const char * key, char * value); + fs::path getParentPath(const String & path); + + const String name; + SettingsPtr settings; + Poco::Logger * logger; + static std::mutex reservation_mutex; + //std::atomic keep_free_space_bytes; + std::atomic broken{false}; + std::atomic readonly{false}; + UInt64 reserved_bytes = 0; + //const String disk_checker_path = ".disk_checker_file"; + UInt64 reservation_count = 0; + int64_t id; + std::shared_ptr sdk_loader; +}; +} diff --git a/src/Disks/CubeFS/registerDiskCubeFS.cpp b/src/Disks/CubeFS/registerDiskCubeFS.cpp new file mode 100644 index 000000000000..c330338bd24b --- /dev/null +++ b/src/Disks/CubeFS/registerDiskCubeFS.cpp @@ -0,0 +1,49 @@ +#include +#include "DiskCubeFS.h" +#include "Disks/DiskFactory.h" +#include "Disks/DiskRestartProxy.h" +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + + DiskCubeFS::SettingsPtr getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix) + { + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "volumn name: {}", config.getString(config_prefix + ".vol_name")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "master address: {}", config.getString(config_prefix + ".master_addr")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "log directory: {}", config.getString(config_prefix + ".log_dir")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "log level: {}", config.getString(config_prefix + ".log_level")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "access key: {}", config.getString(config_prefix + ".access_key")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "secret key: {}", config.getString(config_prefix + ".secret_key")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "push address: {}", config.getString(config_prefix + ".push_addr")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "disk path: {}", config.getString(config_prefix + ".disk_path")); + LOG_DEBUG(&Poco::Logger::get("DiskCubeFS"), "lib path: {}", config.getString(config_prefix + ".lib_path")); + return std::make_shared( + config.getString(config_prefix + ".vol_name"), + config.getString(config_prefix + ".master_addr"), + config.getString(config_prefix + ".log_dir"), + config.getString(config_prefix + ".log_level", "debug"), + config.getString(config_prefix + ".access_key"), + config.getString(config_prefix + ".secret_key"), + config.getString(config_prefix + ".push_addr"), + config.getString(config_prefix + ".disk_path"), + config.getString(config_prefix + ".lib_path")); + } + +void registerDiskCubeFS(DiskFactory & factory) +{ + auto creator = [](const String & name, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + ContextPtr context, + const DisksMap & /*map*/) -> DiskPtr + { + auto settings = getSettings(config, config_prefix); + std::shared_ptr cubeFSdisk = std::make_shared(name, context, settings); + return std::make_shared(cubeFSdisk); + }; + factory.registerDiskType("cubefs", creator); +} +} diff --git a/src/Disks/CubeFS/sdkLoader.cpp b/src/Disks/CubeFS/sdkLoader.cpp new file mode 100644 index 000000000000..188f47e81d4a --- /dev/null +++ b/src/Disks/CubeFS/sdkLoader.cpp @@ -0,0 +1,48 @@ +#include "sdkLoader.h" +#include +#include +namespace DB +{ +namespace ErrorCodes +{ + extern const int EXTERNAL_LIBRARY_ERROR; +} +SdkLoader::SdkLoader(String lib_path_) : lib_path(lib_path_), logger(&Poco::Logger::get("sdkLoader")) +{ + LOG_DEBUG(logger, "load cubefs shared library from path: {}", lib_path_); + void * handle = dlopen(lib_path_.c_str(), RTLD_LAZY); + if (!handle) + { + throwFromErrnoWithPath("Failed to open the dynamic library", lib_path_, ErrorCodes::EXTERNAL_LIBRARY_ERROR); + } + cfsNewClient = reinterpret_cast(dlsym(handle, "cfs_new_client")); + cfsSetClient = reinterpret_cast(dlsym(handle, "cfs_set_client")); + cfsStartClient = reinterpret_cast(dlsym(handle, "cfs_start_client")); + cfsCloseClient = reinterpret_cast(dlsym(handle, "cfs_close_client")); + cfsChdir = reinterpret_cast(dlsym(handle, "cfs_chdir")); + cfsGetcwd = reinterpret_cast(dlsym(handle, "cfs_getcwd")); + cfsGetattr = reinterpret_cast(dlsym(handle, "cfs_getattr")); + cfsSetattr = reinterpret_cast(dlsym(handle, "cfs_setattr")); + cfsOpen = reinterpret_cast(dlsym(handle, "cfs_open")); + cfsFlush = reinterpret_cast(dlsym(handle, "cfs_flush")); + cfsClose = reinterpret_cast(dlsym(handle, "cfs_close")); + cfsLsdir = reinterpret_cast(dlsym(handle, "cfs_lsdir")); + cfsMkdirs = reinterpret_cast(dlsym(handle, "cfs_mkdirs")); + cfsRmdir = reinterpret_cast(dlsym(handle, "cfs_rmdir")); + cfsUnlink = reinterpret_cast(dlsym(handle, "cfs_unlink")); + cfsRename = reinterpret_cast(dlsym(handle, "cfs_rename")); + cfsFchmod = reinterpret_cast(dlsym(handle, "cfs_fchmod")); + cfsGetsummary = reinterpret_cast(dlsym(handle, "cfs_getsummary")); + cfsWrite = reinterpret_cast(dlsym(handle, "cfs_write")); + cfsRead = reinterpret_cast(dlsym(handle, "cfs_read")); + cfsReaddir = reinterpret_cast(dlsym(handle, "cfs_readdir")); + + if (!cfsNewClient || !cfsSetClient || !cfsStartClient || !cfsCloseClient || !cfsChdir || !cfsGetcwd || !cfsGetattr || !cfsSetattr + || !cfsOpen || !cfsFlush || !cfsClose || !cfsWrite || !cfsRead || !cfsLsdir || !cfsMkdirs || !cfsRmdir || !cfsUnlink || !cfsRename + || !cfsFchmod || !cfsGetsummary || !cfsReaddir) + { + dlclose(handle); + throwFromErrnoWithPath("Failed to find one or more functions", "", ErrorCodes::EXTERNAL_LIBRARY_ERROR); + } +} +} diff --git a/src/Disks/CubeFS/sdkLoader.h b/src/Disks/CubeFS/sdkLoader.h new file mode 100644 index 000000000000..77c35527fe1b --- /dev/null +++ b/src/Disks/CubeFS/sdkLoader.h @@ -0,0 +1,63 @@ +#pragma once +#include +#include +#include + +namespace DB +{ +class SdkLoader +{ +public: + SdkLoader(String lib_path_); + + typedef int64_t (*CfsNewClientFunction)(); + typedef int (*CfsSetClientFunction)(int64_t, char *, char *); + typedef int (*CfsStartClientFunction)(int64_t); + typedef void (*CfsCloseClientFunction)(int64_t); + typedef int (*CfsChdirFunction)(int64_t, char *); + typedef char * (*CfsGetcwdFunction)(int64_t); + typedef int (*CfsGetattrFunction)(int64_t, char *, struct cfs_stat_info *); + typedef int (*CfsSetattrFunction)(int64_t, char *, struct cfs_stat_info *, int); + typedef int (*CfsOpenFunction)(int64_t, char *, int, mode_t); + typedef int (*CfsFlushFunction)(int64_t, int); + typedef void (*CfsCloseFunction)(int64_t, int); + typedef ssize_t (*CfsWriteFunction)(int64_t, int, void *, size_t, off_t); + typedef ssize_t (*CfsReadFunction)(int64_t, int, void *, size_t, off_t); + //typedef int (*CfsBatchGetInodesFunction)(int64_t, int, void *, GoSlice, int); + //typedef int (*CfsRefreshSummaryFunction)(int64_t, char *, int); + typedef int (*CfsReaddirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsLsdirFunction)(int64_t, int, GoSlice, int); + typedef int (*CfsMkdirsFunction)(int64_t, char *, mode_t); + typedef int (*CfsRmdirFunction)(int64_t, char *); + typedef int (*CfsUnlinkFunction)(int64_t, char *); + typedef int (*CfsRenameFunction)(int64_t, char *, char *); + typedef int (*CfsFchmodFunction)(int64_t, int, mode_t); + typedef int (*CfsGetsummaryFunction)(int64_t, char *, struct cfs_summary_info *, char *, int); + + CfsNewClientFunction cfsNewClient; + CfsSetClientFunction cfsSetClient; + CfsStartClientFunction cfsStartClient; + CfsCloseClientFunction cfsCloseClient; + CfsChdirFunction cfsChdir; + CfsGetcwdFunction cfsGetcwd; + CfsGetattrFunction cfsGetattr; + CfsSetattrFunction cfsSetattr; + CfsOpenFunction cfsOpen; + CfsFlushFunction cfsFlush; + CfsCloseFunction cfsClose; + CfsLsdirFunction cfsLsdir; + CfsMkdirsFunction cfsMkdirs; + CfsRmdirFunction cfsRmdir; + CfsUnlinkFunction cfsUnlink; + CfsRenameFunction cfsRename; + CfsFchmodFunction cfsFchmod; + CfsGetsummaryFunction cfsGetsummary; + CfsWriteFunction cfsWrite; + CfsReadFunction cfsRead; + CfsReaddirFunction cfsReaddir; + +private: + String lib_path; + Poco::Logger * logger; +}; +} diff --git a/src/Disks/DiskType.h b/src/Disks/DiskType.h index 435f427b05a0..05b4f0f46703 100644 --- a/src/Disks/DiskType.h +++ b/src/Disks/DiskType.h @@ -14,6 +14,7 @@ enum class DiskType Encrypted, WebServer, AzureBlobStorage, + CubeFS, }; inline String toString(DiskType disk_type) @@ -34,6 +35,8 @@ inline String toString(DiskType disk_type) return "web"; case DiskType::AzureBlobStorage: return "azure_blob_storage"; + case DiskType::CubeFS: + return "cubefs"; } __builtin_unreachable(); } diff --git a/src/Disks/registerDisks.cpp b/src/Disks/registerDisks.cpp index 88c3fdde1e06..47843ed943f4 100644 --- a/src/Disks/registerDisks.cpp +++ b/src/Disks/registerDisks.cpp @@ -9,7 +9,7 @@ namespace DB void registerDiskLocal(DiskFactory & factory); void registerDiskMemory(DiskFactory & factory); - +void registerDiskCubeFS(DiskFactory & factory); #if USE_AWS_S3 void registerDiskS3(DiskFactory & factory); #endif @@ -35,6 +35,7 @@ void registerDisks() registerDiskLocal(factory); registerDiskMemory(factory); + registerDiskCubeFS(factory); #if USE_AWS_S3 registerDiskS3(factory); diff --git a/src/Disks/tests/gtest_disk_cubefs.cpp b/src/Disks/tests/gtest_disk_cubefs.cpp new file mode 100644 index 000000000000..7b5dfb0e1481 --- /dev/null +++ b/src/Disks/tests/gtest_disk_cubefs.cpp @@ -0,0 +1,256 @@ +#include +#include +#include +#include +#include +#define RUN_CUBEFS_TEST 1 +#if RUN_CUBEFS_TEST + +# include +# include +# include +# include +# include +# include + +using namespace DB; + +const String path = "/test_dir"; +const String vol_name = "xieyichen"; +const String master_addr = "cfs-south.oppo.local"; +const String log_dir = "/home/xieyichen/logs/cfs2/test-log"; +const String log_level = "debug"; +const String access_key = "jRlZO65q7XlH5bnV"; +const String secret_key = "V1m730UzREHaK1jCkC0kL0cewOX0kH3K"; +const String push_addr = "cfs.dg-push.wanyol.com"; +const String disk_name = "cubefs"; +const String lib_path = "/home/xieyichen/ClickHouse/libcfs.so"; + +class DiskTestCubeFS : public testing::Test +{ +public: + void SetUp() override + { + auto settings = std::make_shared( + vol_name, master_addr, log_dir, log_level, access_key, secret_key, push_addr, path, lib_path); + std::cout << "set client info successfully " << std::endl; + disk = std::make_shared(disk_name, settings); + std::cout << "initial cubefs disk successfully " << std::endl; + } + void TearDown() override + { + disk->removeRecursive(""); + disk.reset(); + } + DB::DiskPtr disk; +}; + +TEST_F(DiskTestCubeFS, createDirectory) +{ + EXPECT_TRUE(this->disk->exists("/")); + std::cout << "empty exist is ok" << std::endl; + EXPECT_EQ(disk_name, this->disk->getName()); + + disk->createDirectory("create_directory"); + EXPECT_TRUE(this->disk->exists("create_directory")); + EXPECT_TRUE(this->disk->isDirectory("create_directory")); + EXPECT_FALSE(this->disk->isFile("create_directory")); + + this->disk->createDirectories("not_exist_parent_directory/subdirectory"); + EXPECT_TRUE(this->disk->isDirectory("not_exist_parent_directory/subdirectory")); + std::cout << "class disk path: " << disk->getPath() << std::endl; +} + + +TEST_F(DiskTestCubeFS, moveDirectory) +{ + disk->createDirectory("create_directory"); + disk->createDirectories("not_exist_parent_directory/subdirectory"); + try + { + std::cout << "try move directory" << std::endl; + this->disk->moveDirectory("not_exist_parent_directory/subdirectory", "not_exist/create_directory/"); + FAIL() << "Expected exception to be thrown."; + //EXPECT_THROW(cubeFSdisk->moveDirectory("not_exist_parent_directory/subdirectory", "not_exist/create_directory/"), std::exception); + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "Destination directory does not exist"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + std::cout << "try move directory end" << std::endl; + this->disk->moveDirectory("not_exist_parent_directory/subdirectory", "create_directory/subdirectory"); + EXPECT_TRUE(this->disk->isDirectory("create_directory/subdirectory")); + std::cout << "move directory end" << std::endl; +} + +TEST_F(DiskTestCubeFS, createFile) +{ + try + { + disk->createFile("create_directory/file"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "Parent directory does not exist"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + disk->createDirectory("create_directory"); + + this->disk->createFile("create_directory/file"); + EXPECT_TRUE(this->disk->isFile("create_directory/file")); + EXPECT_FALSE(this->disk->isDirectory("create_directory/file")); + std::cout << "create file end" << std::endl; +} + +TEST_F(DiskTestCubeFS, moveFile) +{ + disk->createDirectory("create_directory"); + disk->createFile("create_directory/file"); + this->disk->moveDirectory("create_directory", "move_directory"); + EXPECT_TRUE(this->disk->isFile("move_directory/file")); + std::cout << "move directory end" << std::endl; + this->disk->moveFile("move_directory/file", "move_file"); + EXPECT_TRUE(this->disk->isFile("move_file")); + disk->createDirectory("create_directory"); + this->disk->createFile("create_directory/create_file"); + try + { + this->disk->moveFile("move_file", "create_directory/create_file"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "Destination file already exists"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + EXPECT_TRUE(disk->exists("create_directory/create_file")); + std::cout << "start replace file" << std::endl; + disk->replaceFile("move_file", "create_directory/create_file"); + EXPECT_TRUE(this->disk->isFile("create_directory/create_file")); +} + +TEST_F(DiskTestCubeFS, remove) +{ + try + { + disk->removeFile("not_exist"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "File does not exists"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + disk->removeFileIfExists("not_exist"); +} + +/* The file stat information is cached locally and may not be updated immediately +TEST_F(DiskTestCubeFS, lastModified) +{ + disk->createFile("file"); + std::this_thread::sleep_for(std::chrono::seconds(1)); + Poco::Timestamp timestamp; + disk->setLastModified("file", timestamp); + EXPECT_EQ(timestamp.epochTime(), disk->getLastModified("file").epochTime()); +} +*/ + +TEST_F(DiskTestCubeFS, writeFile) +{ + { + std::unique_ptr out = this->disk->writeFile("test_file"); + writeString("test data", *out); + } + + DB::String data; + { + std::unique_ptr in = this->disk->readFile("test_file"); + readString(data, *in); + } + + EXPECT_EQ("test data", data); + EXPECT_EQ(data.size(), this->disk->getFileSize("test_file")); +} + +TEST_F(DiskTestCubeFS, readFile) +{ + try + { + std::unique_ptr in = this->disk->readFile("test_file"); + FAIL() << "Expected exception to be thrown."; + } + catch (const std::exception & e) + { + std::string errorMessage = e.what(); + std::string expectedSubstring = "File does not exists"; + EXPECT_TRUE(errorMessage.find(expectedSubstring) != std::string::npos); + } + + { + std::unique_ptr out = this->disk->writeFile("test_file"); + writeString("test data", *out); + } + + // Test SEEK_SET + { + String buf(4, '0'); + std::unique_ptr in = this->disk->readFile("test_file"); + + in->seek(5, SEEK_SET); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("data", buf); + } + + // Test SEEK_CUR + { + std::unique_ptr in = this->disk->readFile("test_file"); + String buf(4, '0'); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("test", buf); + + // Skip whitespace + in->seek(1, SEEK_CUR); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("data", buf); + } +} + +TEST_F(DiskTestCubeFS, iterateDirectory) +{ + this->disk->createDirectories("test_dir/nested_dir/"); + + { + auto iter = this->disk->iterateDirectory(""); + EXPECT_TRUE(iter->isValid()); + EXPECT_EQ("/test_dir/test_dir", iter->path()); + iter->next(); + EXPECT_FALSE(iter->isValid()); + } + + { + auto iter = this->disk->iterateDirectory("test_dir/"); + EXPECT_TRUE(iter->isValid()); + EXPECT_EQ("/test_dir/test_dir/nested_dir", iter->path()); + iter->next(); + EXPECT_FALSE(iter->isValid()); + } +} + +TEST_F(DiskTestCubeFS, others) +{ + std::cout << "getAvailableSpace result: " << disk->getAvailableSpace() << std::endl; + disk->createFile("file"); + disk->setReadOnly("file"); + std::cout << "getLastChanged result: " << disk->getLastChanged("file") << std::endl; +} + +#endif diff --git a/src/IO/ReadBufferFromCubeFS.cpp b/src/IO/ReadBufferFromCubeFS.cpp new file mode 100644 index 000000000000..07996fc4cbcb --- /dev/null +++ b/src/IO/ReadBufferFromCubeFS.cpp @@ -0,0 +1,195 @@ +#include +namespace ProfileEvents +{ +extern const Event ReadBufferFromFileDescriptorRead; +extern const Event ReadBufferFromFileDescriptorReadFailed; +extern const Event ReadBufferFromFileDescriptorReadBytes; +extern const Event DiskReadElapsedMicroseconds; +extern const Event Seek; +} + +namespace DB +{ +namespace ErrorCodes +{ + extern const int FILE_DOESNT_EXIST; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_CLOSE_FILE; +} + +ReadBufferFromCubeFS::~ReadBufferFromCubeFS() +{ + if (fd < 0) + return; + + sdk_loader->cfsClose(id, fd); +} + +void ReadBufferFromCubeFS::close() +{ + if (fd < 0) + return; + try + { + sdk_loader->cfsClose(id, fd); + } + catch (...) + { + throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE); + } + fd = -1; + //metric_increment.destroy(); +} + +off_t ReadBufferFromCubeFS::size() +{ + struct cfs_stat_info file_info; + int result = sdk_loader->cfsGetattr(id, const_cast(file_name.data()), &file_info); + if (result != 0) + { + // Handle the error (throw an exception, return an error code, etc.) + throwFromErrnoWithPath("Cannot execute fstat " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSTAT); + } + return file_info.size; +} + +bool ReadBufferFromCubeFS::nextImpl() +{ + /// If internal_buffer size is empty, then read() cannot be distinguished from EOF + assert(!internal_buffer.empty()); + /// This is a workaround of a read pass EOF bug in linux kernel with pread() + if (file_size.has_value() && file_offset_of_buffer_end >= *file_size) + return false; + size_t bytes_read = 0; + while (!bytes_read) + { + //ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead); + + //Stopwatch watch(profile_callback ? clock_type : CLOCK_MONOTONIC); + + ssize_t res = 0; + { + //CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; + res = sdk_loader->cfsRead(id, fd, internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end); + } + if (!res) + break; + + if (-1 == res && errno != EINTR) + { + //ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); + throwFromErrnoWithPath("Cannot read from file " + getFileName(), getFileName(), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + + if (res > 0) + bytes_read += res; + + /// It reports real time spent including the time spent while thread was preempted doing nothing. + /// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables). + /// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it + /// (TaskStatsInfoGetter has about 500K RPS). + //watch.stop(); + // ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); + + // if (profile_callback) + // { + // ProfileInfo info; + // info.bytes_requested = internal_buffer.size(); + // info.bytes_read = res; + // info.nanoseconds = watch.elapsed(); + // profile_callback(info); + // } + } + + file_offset_of_buffer_end += bytes_read; + + if (bytes_read) + { + //ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); + working_buffer = internal_buffer; + working_buffer.resize(bytes_read); + } + else + return false; + + return true; +} + +ReadBufferFromCubeFS::ReadBufferFromCubeFS( + int64_t id_, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + int flags, + char * existing_memory, + size_t buf_size, + size_t alignment, + std::optional file_size_) + : ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_), id(id_), file_name(file_name_), sdk_loader(sdk_loader_) +{ + //ProfileEvents::increment(ProfileEvents::FileOpen); + + fd = sdk_loader->cfsOpen( + id_, const_cast(file_name_.data()), flags == -1 ? O_RDONLY | O_CLOEXEC : flags | O_CLOEXEC, S_IRUSR | S_IWUSR); + + if (-1 == fd) + throwFromErrnoWithPath( + "Cannot open file " + file_name_, file_name_, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); +} + +/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. +off_t ReadBufferFromCubeFS::seek(off_t offset, int whence) +{ + size_t new_pos; + if (whence == SEEK_SET) + { + assert(offset >= 0); + new_pos = offset; + } + else if (whence == SEEK_CUR) + { + new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset; + } + else + { + throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + } + + /// Position is unchanged. + if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end) + return new_pos; + + if (file_offset_of_buffer_end - working_buffer.size() <= static_cast(new_pos) && new_pos <= file_offset_of_buffer_end) + { + /// Position is still inside the buffer. + /// Probably it is at the end of the buffer - then we will load data on the following 'next' call. + + pos = working_buffer.end() - file_offset_of_buffer_end + new_pos; + assert(pos >= working_buffer.begin()); + assert(pos <= working_buffer.end()); + + return new_pos; + } + else + { + /// Position is out of the buffer, we need to do real seek. + off_t seek_pos = new_pos; + + off_t offset_after_seek_pos = new_pos - seek_pos; + + /// First reset the buffer so the next read will fetch new data to the buffer. + resetWorkingBuffer(); + + /// In case of using 'pread' we just update the info about the next position in file. + /// In case of using 'read' we call 'lseek'. + + /// We account both cases as seek event as it leads to non-contiguous reads from file. + ProfileEvents::increment(ProfileEvents::Seek); + file_offset_of_buffer_end = seek_pos; + + if (offset_after_seek_pos > 0) + ignore(offset_after_seek_pos); + + return seek_pos; + } +} +} diff --git a/src/IO/ReadBufferFromCubeFS.h b/src/IO/ReadBufferFromCubeFS.h new file mode 100644 index 000000000000..8eb09d886408 --- /dev/null +++ b/src/IO/ReadBufferFromCubeFS.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; + extern const int ARGUMENT_OUT_OF_BOUND; + extern const int CANNOT_SEEK_THROUGH_FILE; + extern const int CANNOT_SELECT; + extern const int CANNOT_FSTAT; + extern const int CANNOT_ADVISE; +} +class ReadBufferFromCubeFS : public ReadBufferFromFileBase +{ +public: + explicit ReadBufferFromCubeFS( + int64_t id_, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + int flags = -1, + char * existing_memory = nullptr, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + size_t alignment = 0, + std::optional file_size_ = std::nullopt); + ~ReadBufferFromCubeFS() override; + std::string getFileName() const override { return file_name; } + Range getRemainingReadRange() const override { return Range{.left = file_offset_of_buffer_end, .right = std::nullopt}; } + size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; } + off_t getPosition() override { return file_offset_of_buffer_end - (working_buffer.end() - pos); } + void close(); + /** Could be used before initialization if needed 'fd' was not passed to constructor. + * It's not possible to change 'fd' during work. + */ + void setFD(int fd_) { fd = fd_; } + int getFD() const { return fd; } + off_t size(); + /// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. + off_t seek(off_t off, int whence) override; + +private: + bool nextImpl() override; + int fd; + int64_t id; + std::string file_name; + std::shared_ptr sdk_loader; + size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end(). +}; +} diff --git a/src/IO/WriteBufferFromCubeFS.cpp b/src/IO/WriteBufferFromCubeFS.cpp new file mode 100644 index 000000000000..ba63a2ddffaf --- /dev/null +++ b/src/IO/WriteBufferFromCubeFS.cpp @@ -0,0 +1,122 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int FILE_DOESNT_EXIST; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_CLOSE_FILE; + extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR; + extern const int CANNOT_FSYNC; + extern const int CANNOT_SEEK_THROUGH_FILE; + extern const int CANNOT_TRUNCATE_FILE; + extern const int CANNOT_FSTAT; +} + +WriteBufferFromCubeFS::WriteBufferFromCubeFS( + int64_t id_, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + size_t buf_size, + int flags, + mode_t mode, + char * existing_memory, + size_t alignment) + : WriteBufferFromFileBase(buf_size, existing_memory, alignment), id(id_), file_name(std::move(file_name_)), sdk_loader(sdk_loader_) +{ + //If O_APPEND is not added, cfsOpen will write from the specified offset position (0) each time. + fd = sdk_loader->cfsOpen( + id, + const_cast(file_name_.data()), + flags == -1 ? O_WRONLY | O_TRUNC | O_CREAT | O_CLOEXEC | O_APPEND : flags | O_CLOEXEC, + mode); + + if (-1 == fd) + throwFromErrnoWithPath( + "Cannot open file " + file_name, file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); +} + +WriteBufferFromCubeFS::~WriteBufferFromCubeFS() +{ + finalize(); + sdk_loader->cfsClose(id, fd); +} + +void WriteBufferFromCubeFS::close() +{ + if (fd < 0) + return; + next(); + sdk_loader->cfsClose(id, fd); + fd = -1; +} + +void WriteBufferFromCubeFS::finalizeImpl() +{ + if (fd < 0) + return; + + next(); +} + +void WriteBufferFromCubeFS::nextImpl() +{ + if (!offset()) + return; + + //Stopwatch watch; + + size_t bytes_written = 0; + while (bytes_written != offset()) + { + //ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite); + + ssize_t res = 0; + { + //CurrentMetrics::Increment metric_increment{CurrentMetrics::Write}; + res = sdk_loader->cfsWrite(id, fd, working_buffer.begin() + bytes_written, offset() - bytes_written, 0); + } + + if ((-1 == res || 0 == res) && errno != EINTR) + { + //ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteFailed); + + /// Don't use getFileName() here because this method can be called from destructor + String error_file_name = file_name; + if (error_file_name.empty()) + error_file_name = "(fd = " + toString(fd) + ")"; + throwFromErrnoWithPath("Cannot write to file " + error_file_name, error_file_name, ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR); + } + + if (res > 0) + bytes_written += res; + } + + //ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds()); + //ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written); +} + +void WriteBufferFromCubeFS::sync() +{ + /// If buffer has pending data - write it. + next(); + + int res = sdk_loader->cfsFlush(id, fd); + if (-1 == res) + throwFromErrnoWithPath("Cannot flush " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSYNC); +} + +off_t WriteBufferFromCubeFS::size() +{ + struct cfs_stat_info file_info; + int result = sdk_loader->cfsGetattr(id, const_cast(file_name.data()), &file_info); + if (result != 0) + { + // Handle the error (throw an exception, return an error code, etc.) + throwFromErrnoWithPath("Cannot execute fstat " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSTAT); + } + return file_info.size; +} +} diff --git a/src/IO/WriteBufferFromCubeFS.h b/src/IO/WriteBufferFromCubeFS.h new file mode 100644 index 000000000000..efbb283d250a --- /dev/null +++ b/src/IO/WriteBufferFromCubeFS.h @@ -0,0 +1,38 @@ +#pragma once +#include +#include + +namespace Poco +{ +class Logger; +} +namespace DB +{ +class WriteBufferFromCubeFS : public WriteBufferFromFileBase +{ +public: + explicit WriteBufferFromCubeFS( + int64_t id, + const std::string & file_name_, + std::shared_ptr sdk_loader_, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + int flags = -1, + mode_t mode = 0666, + char * existing_memory = nullptr, + size_t alignment = 0); + ~WriteBufferFromCubeFS() override; + void close(); + void sync() override; + std::string getFileName() const override { return file_name; } + off_t size(); + Poco::Logger * log = &Poco::Logger::get("WriteBufferFromCubeFS"); + +private: + void finalizeImpl() override; + void nextImpl() override; + int64_t id; + std::string file_name; + std::shared_ptr sdk_loader; + int fd; +}; +}