From 7c819bee64d08a1c19116a87fa87080b9792113a Mon Sep 17 00:00:00 2001 From: jinhelin Date: Fri, 17 Feb 2023 18:05:04 +0800 Subject: [PATCH] Add AWS SDK (#6828) ref pingcap/tiflash#6827 --- .gitmodules | 42 +++ contrib/CMakeLists.txt | 2 + contrib/aws | 1 + contrib/aws-c-auth | 1 + contrib/aws-c-cal | 1 + contrib/aws-c-common | 1 + contrib/aws-c-compression | 1 + contrib/aws-c-event-stream | 1 + contrib/aws-c-http | 1 + contrib/aws-c-io | 1 + contrib/aws-c-mqtt | 1 + contrib/aws-c-s3 | 1 + contrib/aws-c-sdkutils | 1 + contrib/aws-checksums | 1 + contrib/aws-cmake/AwsFeatureTests.cmake | 114 ++++++ contrib/aws-cmake/AwsSIMD.cmake | 74 ++++ contrib/aws-cmake/AwsThreadAffinity.cmake | 50 +++ contrib/aws-cmake/AwsThreadName.cmake | 61 +++ contrib/aws-cmake/CMakeLists.txt | 355 ++++++++++++++++++ contrib/aws-crt-cpp | 1 + contrib/aws-s2n-tls | 1 + dbms/CMakeLists.txt | 2 + dbms/src/Common/ErrorCodes.cpp | 2 + dbms/src/Common/ProfileEvents.cpp | 11 +- dbms/src/Server/Server.cpp | 10 + dbms/src/Server/StorageConfigParser.cpp | 71 +++- dbms/src/Server/StorageConfigParser.h | 13 + .../src/Server/tests/gtest_storage_config.cpp | 70 ++++ dbms/src/Storages/S3/S3Common.cpp | 293 +++++++++++++++ dbms/src/Storages/S3/S3Common.h | 77 ++++ 30 files changed, 1240 insertions(+), 21 deletions(-) create mode 160000 contrib/aws create mode 160000 contrib/aws-c-auth create mode 160000 contrib/aws-c-cal create mode 160000 contrib/aws-c-common create mode 160000 contrib/aws-c-compression create mode 160000 contrib/aws-c-event-stream create mode 160000 contrib/aws-c-http create mode 160000 contrib/aws-c-io create mode 160000 contrib/aws-c-mqtt create mode 160000 contrib/aws-c-s3 create mode 160000 contrib/aws-c-sdkutils create mode 160000 contrib/aws-checksums create mode 100644 contrib/aws-cmake/AwsFeatureTests.cmake create mode 100644 contrib/aws-cmake/AwsSIMD.cmake create mode 100644 contrib/aws-cmake/AwsThreadAffinity.cmake create mode 100644 contrib/aws-cmake/AwsThreadName.cmake create mode 100644 contrib/aws-cmake/CMakeLists.txt create mode 160000 contrib/aws-crt-cpp create mode 160000 contrib/aws-s2n-tls create mode 100644 dbms/src/Storages/S3/S3Common.cpp create mode 100644 dbms/src/Storages/S3/S3Common.h diff --git a/.gitmodules b/.gitmodules index 44bb7d920ab..7cea237bd83 100644 --- a/.gitmodules +++ b/.gitmodules @@ -92,3 +92,45 @@ [submodule "contrib/GmSSL"] path = contrib/GmSSL url = https://github.com/guanzhi/GmSSL.git +[submodule "contrib/aws"] + path = contrib/aws + url = git@github.com:aws/aws-sdk-cpp.git +[submodule "contrib/aws-c-auth"] + path = contrib/aws-c-auth + url = git@github.com:awslabs/aws-c-auth.git +[submodule "contrib/aws-c-cal"] + path = contrib/aws-c-cal + url = git@github.com:ClickHouse/aws-c-cal.git +[submodule "contrib/aws-c-common"] + path = contrib/aws-c-common + url = git@github.com:awslabs/aws-c-common.git +[submodule "contrib/aws-c-compression"] + path = contrib/aws-c-compression + url = git@github.com:awslabs/aws-c-compression.git +[submodule "contrib/aws-c-event-stream"] + path = contrib/aws-c-event-stream + url = git@github.com:awslabs/aws-c-event-stream.git +[submodule "contrib/aws-c-http"] + path = contrib/aws-c-http + url = git@github.com:awslabs/aws-c-http.git +[submodule "contrib/aws-c-io"] + path = contrib/aws-c-io + url = git@github.com:awslabs/aws-c-io.git +[submodule "contrib/aws-c-mqtt"] + path = contrib/aws-c-mqtt + url = git@github.com:awslabs/aws-c-mqtt.git +[submodule "contrib/aws-c-s3"] + path = contrib/aws-c-s3 + url = git@github.com:awslabs/aws-c-s3.git +[submodule "contrib/aws-c-sdkutils"] + path = contrib/aws-c-sdkutils + url = git@github.com:awslabs/aws-c-sdkutils.git +[submodule "contrib/aws-checksums"] + path = contrib/aws-checksums + url = git@github.com:awslabs/aws-checksums.git +[submodule "contrib/aws-crt-cpp"] + path = contrib/aws-crt-cpp + url = git@github.com:awslabs/aws-crt-cpp.git +[submodule "contrib/aws-s2n-tls"] + path = contrib/aws-s2n-tls + url = git@github.com:aws/s2n-tls.git diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index cf5c6d5704d..e2113765cf3 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -177,3 +177,5 @@ if (ARCH_AARCH64 AND OS_LINUX) endif () add_subdirectory(magic_enum) + +add_subdirectory(aws-cmake) diff --git a/contrib/aws b/contrib/aws new file mode 160000 index 00000000000..af7d4160099 --- /dev/null +++ b/contrib/aws @@ -0,0 +1 @@ +Subproject commit af7d416009980d268235c2fccb5eb1bd9526980e diff --git a/contrib/aws-c-auth b/contrib/aws-c-auth new file mode 160000 index 00000000000..dd505b55fd4 --- /dev/null +++ b/contrib/aws-c-auth @@ -0,0 +1 @@ +Subproject commit dd505b55fd46222834f35c6e54165d8cbebbfaaa diff --git a/contrib/aws-c-cal b/contrib/aws-c-cal new file mode 160000 index 00000000000..85dd7664b78 --- /dev/null +++ b/contrib/aws-c-cal @@ -0,0 +1 @@ +Subproject commit 85dd7664b786a389c6fb1a6f031ab4bb2282133d diff --git a/contrib/aws-c-common b/contrib/aws-c-common new file mode 160000 index 00000000000..84cc08fb9ba --- /dev/null +++ b/contrib/aws-c-common @@ -0,0 +1 @@ +Subproject commit 84cc08fb9ba7061dca971ab95111c098f476a295 diff --git a/contrib/aws-c-compression b/contrib/aws-c-compression new file mode 160000 index 00000000000..b517b7decd0 --- /dev/null +++ b/contrib/aws-c-compression @@ -0,0 +1 @@ +Subproject commit b517b7decd0dac30be2162f5186c250221c53aff diff --git a/contrib/aws-c-event-stream b/contrib/aws-c-event-stream new file mode 160000 index 00000000000..2f9b60c42f9 --- /dev/null +++ b/contrib/aws-c-event-stream @@ -0,0 +1 @@ +Subproject commit 2f9b60c42f90840ec11822acda3d8cdfa97a773d diff --git a/contrib/aws-c-http b/contrib/aws-c-http new file mode 160000 index 00000000000..99894610086 --- /dev/null +++ b/contrib/aws-c-http @@ -0,0 +1 @@ +Subproject commit 99894610086372df1cf09d62b9eafca42eb53f5b diff --git a/contrib/aws-c-io b/contrib/aws-c-io new file mode 160000 index 00000000000..f2ff573c191 --- /dev/null +++ b/contrib/aws-c-io @@ -0,0 +1 @@ +Subproject commit f2ff573c191e1c4ea0248af5c08161356be3bc78 diff --git a/contrib/aws-c-mqtt b/contrib/aws-c-mqtt new file mode 160000 index 00000000000..5cbde90916a --- /dev/null +++ b/contrib/aws-c-mqtt @@ -0,0 +1 @@ +Subproject commit 5cbde90916a1f9945e2a1ef36f3db58e1c976167 diff --git a/contrib/aws-c-s3 b/contrib/aws-c-s3 new file mode 160000 index 00000000000..91e03c1875f --- /dev/null +++ b/contrib/aws-c-s3 @@ -0,0 +1 @@ +Subproject commit 91e03c1875f8c28a71744d75b980336191f096e2 diff --git a/contrib/aws-c-sdkutils b/contrib/aws-c-sdkutils new file mode 160000 index 00000000000..208a701fa01 --- /dev/null +++ b/contrib/aws-c-sdkutils @@ -0,0 +1 @@ +Subproject commit 208a701fa01e99c7c8cc3dcebc8317da71362972 diff --git a/contrib/aws-checksums b/contrib/aws-checksums new file mode 160000 index 00000000000..ad53be196a2 --- /dev/null +++ b/contrib/aws-checksums @@ -0,0 +1 @@ +Subproject commit ad53be196a25bbefa3700a01187fdce573a7d2d0 diff --git a/contrib/aws-cmake/AwsFeatureTests.cmake b/contrib/aws-cmake/AwsFeatureTests.cmake new file mode 100644 index 00000000000..54727e08d6b --- /dev/null +++ b/contrib/aws-cmake/AwsFeatureTests.cmake @@ -0,0 +1,114 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +include(CheckCSourceRuns) + +option(USE_CPU_EXTENSIONS "Whenever possible, use functions optimized for CPUs with specific extensions (ex: SSE, AVX)." ON) + +# In the current (11/2/21) state of mingw64, the packaged gcc is not capable of emitting properly aligned avx2 instructions under certain circumstances. +# This leads to crashes for windows builds using mingw64 when invoking the avx2-enabled versions of certain functions. Until we can find a better +# work-around, disable avx2 (and all other extensions) in mingw builds. +# +# https://gcc.gnu.org/bugzilla/show_bug.cgi?id=54412 +# +if (MINGW) + message(STATUS "MINGW detected! Disabling avx2 and other CPU extensions") + set(USE_CPU_EXTENSIONS OFF) +endif() + +if(NOT CMAKE_CROSSCOMPILING) + check_c_source_runs(" + #include + bool foo(int a, int b, int *c) { + return __builtin_mul_overflow(a, b, c); + } + + int main() { + int out; + if (foo(1, 2, &out)) { + return 0; + } + + return 0; + }" AWS_HAVE_GCC_OVERFLOW_MATH_EXTENSIONS) + + if (USE_CPU_EXTENSIONS) + check_c_source_runs(" + int main() { + int foo = 42; + _mulx_u32(1, 2, &foo); + return foo != 2; + }" AWS_HAVE_MSVC_MULX) + endif() + +endif() + +check_c_source_compiles(" + #include + #if WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP) + int main() { + return 0; + } + #else + it's not windows desktop + #endif +" AWS_HAVE_WINAPI_DESKTOP) + +check_c_source_compiles(" + int main() { +#if !(defined(__x86_64__) || defined(__i386__) || defined(_M_X64) || defined(_M_IX86)) +# error \"not intel\" +#endif + return 0; + } +" AWS_ARCH_INTEL) + +check_c_source_compiles(" + int main() { +#if !(defined(__aarch64__) || defined(_M_ARM64)) +# error \"not arm64\" +#endif + return 0; + } +" AWS_ARCH_ARM64) + +check_c_source_compiles(" + int main() { +#if !(defined(__arm__) || defined(_M_ARM)) +# error \"not arm\" +#endif + return 0; + } +" AWS_ARCH_ARM32) + +check_c_source_compiles(" +int main() { + int foo = 42, bar = 24; + __asm__ __volatile__(\"\":\"=r\"(foo):\"r\"(bar):\"memory\"); +}" AWS_HAVE_GCC_INLINE_ASM) + +check_c_source_compiles(" +#include +int main() { +#ifdef __linux__ + getauxval(AT_HWCAP); + getauxval(AT_HWCAP2); +#endif + return 0; +}" AWS_HAVE_AUXV) + +string(REGEX MATCH "^(aarch64|arm)" ARM_CPU "${CMAKE_SYSTEM_PROCESSOR}") +if(NOT LEGACY_COMPILER_SUPPORT OR ARM_CPU) + check_c_source_compiles(" + #include + int main() { + backtrace(NULL, 0); + return 0; + }" AWS_HAVE_EXECINFO) +endif() + +check_c_source_compiles(" +#include +int main() { + return 1; +}" AWS_HAVE_LINUX_IF_LINK_H) diff --git a/contrib/aws-cmake/AwsSIMD.cmake b/contrib/aws-cmake/AwsSIMD.cmake new file mode 100644 index 00000000000..bd6f4064e78 --- /dev/null +++ b/contrib/aws-cmake/AwsSIMD.cmake @@ -0,0 +1,74 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +include(CheckCCompilerFlag) +include(CheckIncludeFile) + +if (USE_CPU_EXTENSIONS) + if (MSVC) + check_c_compiler_flag("/arch:AVX2" HAVE_M_AVX2_FLAG) + if (HAVE_M_AVX2_FLAG) + set(AVX2_CFLAGS "/arch:AVX2") + endif() + else() + check_c_compiler_flag(-mavx2 HAVE_M_AVX2_FLAG) + if (HAVE_M_AVX2_FLAG) + set(AVX2_CFLAGS "-mavx -mavx2") + endif() + endif() + + + cmake_push_check_state() + set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} ${AVX2_CFLAGS}") + + check_c_source_compiles(" + #include + #include + #include + + int main() { + __m256i vec; + memset(&vec, 0, sizeof(vec)); + + _mm256_shuffle_epi8(vec, vec); + _mm256_set_epi32(1,2,3,4,5,6,7,8); + _mm256_permutevar8x32_epi32(vec, vec); + + return 0; + }" HAVE_AVX2_INTRINSICS) + + check_c_source_compiles(" + #include + #include + + int main() { + __m256i vec; + memset(&vec, 0, sizeof(vec)); + return (int)_mm256_extract_epi64(vec, 2); + }" HAVE_MM256_EXTRACT_EPI64) + + cmake_pop_check_state() +endif() # USE_CPU_EXTENSIONS + +macro(simd_add_definition_if target definition) + if(${definition}) + target_compile_definitions(${target} PRIVATE -D${definition}) + endif(${definition}) +endmacro(simd_add_definition_if) + +# Configure private preprocessor definitions for SIMD-related features +# Does not set any processor feature codegen flags +function(simd_add_definitions target) + simd_add_definition_if(${target} HAVE_AVX2_INTRINSICS) + simd_add_definition_if(${target} HAVE_MM256_EXTRACT_EPI64) +endfunction(simd_add_definitions) + +# Adds source files only if AVX2 is supported. These files will be built with +# avx2 intrinsics enabled. +# Usage: simd_add_source_avx2(target file1.c file2.c ...) +function(simd_add_source_avx2 target) + foreach(file ${ARGN}) + target_sources(${target} PRIVATE ${file}) + set_source_files_properties(${file} PROPERTIES COMPILE_FLAGS "${AVX2_CFLAGS}") + endforeach() +endfunction(simd_add_source_avx2) diff --git a/contrib/aws-cmake/AwsThreadAffinity.cmake b/contrib/aws-cmake/AwsThreadAffinity.cmake new file mode 100644 index 00000000000..9e53481272c --- /dev/null +++ b/contrib/aws-cmake/AwsThreadAffinity.cmake @@ -0,0 +1,50 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +include(CheckSymbolExists) + +# Check if the platform supports setting thread affinity +# (important for hitting full NIC entitlement on NUMA architectures) +function(aws_set_thread_affinity_method target) + + # Non-POSIX, Android, and Apple platforms do not support thread affinity. + if (NOT UNIX OR ANDROID OR APPLE) + target_compile_definitions(${target} PRIVATE + -DAWS_AFFINITY_METHOD=AWS_AFFINITY_METHOD_NONE) + return() + endif() + + cmake_push_check_state() + list(APPEND CMAKE_REQUIRED_DEFINITIONS -D_GNU_SOURCE) + list(APPEND CMAKE_REQUIRED_LIBRARIES pthread) + + set(headers "pthread.h") + # BSDs put nonportable pthread declarations in a separate header. + if(CMAKE_SYSTEM_NAME MATCHES BSD) + set(headers "${headers};pthread_np.h") + endif() + + # Using pthread attrs is the preferred method, but is glibc-specific. + check_symbol_exists(pthread_attr_setaffinity_np "${headers}" USE_PTHREAD_ATTR_SETAFFINITY) + if (USE_PTHREAD_ATTR_SETAFFINITY) + target_compile_definitions(${target} PRIVATE + -DAWS_AFFINITY_METHOD=AWS_AFFINITY_METHOD_PTHREAD_ATTR) + return() + endif() + + # This method is still nonportable, but is supported by musl and BSDs. + check_symbol_exists(pthread_setaffinity_np "${headers}" USE_PTHREAD_SETAFFINITY) + if (USE_PTHREAD_SETAFFINITY) + target_compile_definitions(${target} PRIVATE + -DAWS_AFFINITY_METHOD=AWS_AFFINITY_METHOD_PTHREAD) + return() + endif() + + # If we got here, we expected thread affinity support but didn't find it. + # We still build with degraded NUMA performance, but show a warning. + message(WARNING "No supported method for setting thread affinity") + target_compile_definitions(${target} PRIVATE + -DAWS_AFFINITY_METHOD=AWS_AFFINITY_METHOD_NONE) + + cmake_pop_check_state() +endfunction() diff --git a/contrib/aws-cmake/AwsThreadName.cmake b/contrib/aws-cmake/AwsThreadName.cmake new file mode 100644 index 00000000000..a67416b4f83 --- /dev/null +++ b/contrib/aws-cmake/AwsThreadName.cmake @@ -0,0 +1,61 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +include(CheckSymbolExists) + +# Check how the platform supports setting thread name +function(aws_set_thread_name_method target) + + if (WINDOWS) + # On Windows we do a runtime check, instead of compile-time check + return() + elseif (APPLE) + # All Apple platforms we support have the same function, so no need for compile-time check. + return() + endif() + + cmake_push_check_state() + list(APPEND CMAKE_REQUIRED_DEFINITIONS -D_GNU_SOURCE) + list(APPEND CMAKE_REQUIRED_LIBRARIES pthread) + + # The start of the test program + set(c_source_start " + #define _GNU_SOURCE + #include + + #if defined(__FreeBSD__) || defined(__NETBSD__) + #include + #endif + + int main() { + pthread_t thread_id; + ") + + # The end of the test program + set(c_source_end "}") + + # pthread_setname_np() usually takes 2 args + check_c_source_compiles(" + ${c_source_start} + pthread_setname_np(thread_id, \"asdf\"); + ${c_source_end}" + PTHREAD_SETNAME_TAKES_2ARGS) + if (PTHREAD_SETNAME_TAKES_2ARGS) + target_compile_definitions(${target} PRIVATE -DAWS_PTHREAD_SETNAME_TAKES_2ARGS) + return() + endif() + + # But on NetBSD it takes 3! + check_c_source_compiles(" + ${c_source_start} + pthread_setname_np(thread_id, \"asdf\", NULL); + ${c_source_end} + " PTHREAD_SETNAME_TAKES_3ARGS) + if (PTHREAD_SETNAME_TAKES_3ARGS) + target_compile_definitions(${target} PRIVATE -DAWS_PTHREAD_SETNAME_TAKES_3ARGS) + return() + endif() + + # And on many older/weirder platforms it's just not supported + cmake_pop_check_state() +endfunction() diff --git a/contrib/aws-cmake/CMakeLists.txt b/contrib/aws-cmake/CMakeLists.txt new file mode 100644 index 00000000000..3775472c51f --- /dev/null +++ b/contrib/aws-cmake/CMakeLists.txt @@ -0,0 +1,355 @@ +# Utilities. +include("${TiFlash_SOURCE_DIR}/contrib/aws-cmake/AwsFeatureTests.cmake") +include("${TiFlash_SOURCE_DIR}/contrib/aws-cmake/AwsThreadAffinity.cmake") +include("${TiFlash_SOURCE_DIR}/contrib/aws-cmake/AwsThreadName.cmake") +include("${TiFlash_SOURCE_DIR}/contrib/aws-cmake/AwsSIMD.cmake") + +# Gather sources and options. +set(AWS_SOURCES) +set(AWS_PUBLIC_INCLUDES) +set(AWS_PRIVATE_INCLUDES) +set(AWS_PUBLIC_COMPILE_DEFS) +set(AWS_PRIVATE_COMPILE_DEFS) +set(AWS_PRIVATE_LIBS) + +list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DENABLE_CURL_CLIENT") + +if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG") + list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DDEBUG_BUILD") +endif() + +set(ENABLE_OPENSSL_ENCRYPTION ON) +if (ENABLE_OPENSSL_ENCRYPTION) + list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DENABLE_OPENSSL_ENCRYPTION") +endif() + +set(USE_S2N ON) +if (USE_S2N) + list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DUSE_S2N") +endif() + +# Directories. +SET(AWS_SDK_DIR "${TiFlash_SOURCE_DIR}/contrib/aws") +SET(AWS_SDK_CORE_DIR "${AWS_SDK_DIR}/src/aws-cpp-sdk-core") +SET(AWS_SDK_S3_DIR "${AWS_SDK_DIR}/generated/src/aws-cpp-sdk-s3") + +SET(AWS_AUTH_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-c-auth") +SET(AWS_CAL_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-c-cal") +SET(AWS_CHECKSUMS_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-checksums") +SET(AWS_COMMON_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-c-common") +SET(AWS_COMPRESSION_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-c-compression") +SET(AWS_CRT_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-crt-cpp") +SET(AWS_EVENT_STREAM_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-c-event-stream") +SET(AWS_HTTP_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-c-http") +SET(AWS_IO_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-c-io") +SET(AWS_MQTT_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-c-mqtt") +SET(AWS_S2N_TLS_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-s2n-tls") +SET(AWS_S3_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-c-s3") +SET(AWS_SDKUTILS_DIR "${TiFlash_SOURCE_DIR}/contrib/aws-c-sdkutils") + + +# aws-cpp-sdk-core +file(GLOB AWS_SDK_CORE_SRC + "${AWS_SDK_CORE_DIR}/source/*.cpp" + "${AWS_SDK_CORE_DIR}/source/auth/*.cpp" + "${AWS_SDK_CORE_DIR}/source/auth/bearer-token-provider/*.cpp" + "${AWS_SDK_CORE_DIR}/source/auth/signer/*.cpp" + "${AWS_SDK_CORE_DIR}/source/auth/signer-provider/*.cpp" + "${AWS_SDK_CORE_DIR}/source/client/*.cpp" + "${AWS_SDK_CORE_DIR}/source/config/*.cpp" + "${AWS_SDK_CORE_DIR}/source/config/defaults/*.cpp" + "${AWS_SDK_CORE_DIR}/source/endpoint/*.cpp" + "${AWS_SDK_CORE_DIR}/source/endpoint/internal/*.cpp" + "${AWS_SDK_CORE_DIR}/source/external/cjson/*.cpp" + "${AWS_SDK_CORE_DIR}/source/external/tinyxml2/*.cpp" + "${AWS_SDK_CORE_DIR}/source/http/*.cpp" + "${AWS_SDK_CORE_DIR}/source/http/standard/*.cpp" + "${AWS_SDK_CORE_DIR}/source/http/curl/*.cpp" + "${AWS_SDK_CORE_DIR}/source/internal/*.cpp" + "${AWS_SDK_CORE_DIR}/source/monitoring/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/base64/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/crypto/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/crypto/openssl/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/crypto/factory/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/event/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/json/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/logging/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/memory/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/memory/stl/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/stream/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/threading/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/xml/*.cpp" +) + +if(OS_LINUX OR OS_DARWIN) + file(GLOB AWS_SDK_CORE_NET_SRC "${AWS_SDK_CORE_DIR}/source/net/linux-shared/*.cpp") + file(GLOB AWS_SDK_CORE_PLATFORM_SRC "${AWS_SDK_CORE_DIR}/source/platform/linux-shared/*.cpp") +else() + file(GLOB AWS_SDK_CORE_NET_SRC "${AWS_SDK_CORE_DIR}/source/net/*.cpp") + set(AWS_SDK_CORE_PLATFORM_SRC) +endif() + +OPTION(USE_AWS_MEMORY_MANAGEMENT "Aws memory management" OFF) +configure_file("${AWS_SDK_CORE_DIR}/include/aws/core/SDKConfig.h.in" + "${CMAKE_CURRENT_BINARY_DIR}/include/aws/core/SDKConfig.h" @ONLY) + +list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MAJOR=1") +list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MINOR=10") +list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_PATCH=36") + +list(APPEND AWS_SOURCES ${AWS_SDK_CORE_SRC} ${AWS_SDK_CORE_NET_SRC} ${AWS_SDK_CORE_PLATFORM_SRC}) + +list(APPEND AWS_PUBLIC_INCLUDES + "${AWS_SDK_CORE_DIR}/include/" + "${CMAKE_CURRENT_BINARY_DIR}/include" +) + + +# aws-cpp-sdk-s3 +file(GLOB AWS_SDK_S3_SRC + "${AWS_SDK_S3_DIR}/source/*.cpp" + "${AWS_SDK_S3_DIR}/source/model/*.cpp" +) + +list(APPEND AWS_SOURCES ${AWS_SDK_S3_SRC}) +list(APPEND AWS_PUBLIC_INCLUDES "${AWS_SDK_S3_DIR}/include/") + + +# aws-c-auth +file(GLOB AWS_AUTH_SRC + "${AWS_AUTH_DIR}/source/*.c" +) + +list(APPEND AWS_SOURCES ${AWS_AUTH_SRC}) +list(APPEND AWS_PUBLIC_INCLUDES "${AWS_AUTH_DIR}/include/") + + +# aws-c-cal +file(GLOB AWS_CAL_SRC + "${AWS_CAL_DIR}/source/*.c" +) + +if (ENABLE_OPENSSL_ENCRYPTION) + file(GLOB AWS_CAL_OS_SRC + "${AWS_CAL_DIR}/source/unix/*.c" + ) + list(APPEND AWS_PRIVATE_LIBS OpenSSL::Crypto) +endif() + +list(APPEND AWS_SOURCES ${AWS_CAL_SRC} ${AWS_CAL_OS_SRC}) +list(APPEND AWS_PRIVATE_INCLUDES "${AWS_CAL_DIR}/include/") + + +# aws-c-event-stream +file(GLOB AWS_EVENT_STREAM_SRC + "${AWS_EVENT_STREAM_DIR}/source/*.c" +) + +list(APPEND AWS_SOURCES ${AWS_EVENT_STREAM_SRC}) +list(APPEND AWS_PRIVATE_INCLUDES "${AWS_EVENT_STREAM_DIR}/include/") + + +# aws-c-common +file(GLOB AWS_COMMON_SRC + "${AWS_COMMON_DIR}/source/*.c" + "${AWS_COMMON_DIR}/source/external/*.c" + "${AWS_COMMON_DIR}/source/posix/*.c" +) + +file(GLOB AWS_COMMON_ARCH_SRC + "${AWS_COMMON_DIR}/source/arch/generic/*.c" +) + +if (AWS_ARCH_INTEL) + file(GLOB AWS_COMMON_ARCH_SRC + "${AWS_COMMON_DIR}/source/arch/intel/cpuid.c" + "${AWS_COMMON_DIR}/source/arch/intel/asm/*.c" + ) +elseif (AWS_ARCH_ARM64 OR AWS_ARCH_ARM32) + if (AWS_HAVE_AUXV) + file(GLOB AWS_COMMON_ARCH_SRC + "${AWS_COMMON_DIR}/source/arch/arm/asm/*.c" + ) + endif() +endif() + +set(AWS_COMMON_AVX2_SRC) +if (HAVE_AVX2_INTRINSICS) + list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DUSE_SIMD_ENCODING") + set(AWS_COMMON_AVX2_SRC "${AWS_COMMON_DIR}/source/arch/intel/encoding_avx2.c") + set_source_files_properties(${AWS_COMMON_AVX2_SRC} PROPERTIES COMPILE_FLAGS "${AVX2_CFLAGS}") +endif() + +configure_file("${AWS_COMMON_DIR}/include/aws/common/config.h.in" + "${CMAKE_CURRENT_BINARY_DIR}/include/aws/common/config.h" @ONLY) + +list(APPEND AWS_SOURCES ${AWS_COMMON_SRC} ${AWS_COMMON_ARCH_SRC} ${AWS_COMMON_AVX2_SRC}) + +list(APPEND AWS_PUBLIC_INCLUDES + "${AWS_COMMON_DIR}/include/" + "${CMAKE_CURRENT_BINARY_DIR}/include" +) + + +# aws-checksums +file(GLOB AWS_CHECKSUMS_SRC + "${AWS_CHECKSUMS_DIR}/source/*.c" + "${AWS_CHECKSUMS_DIR}/source/intel/*.c" + "${AWS_CHECKSUMS_DIR}/source/intel/asm/*.c" + "${AWS_CHECKSUMS_DIR}/source/arm/*.c" +) + +if(AWS_ARCH_INTEL AND AWS_HAVE_GCC_INLINE_ASM) + file(GLOB AWS_CHECKSUMS_ARCH_SRC + "${AWS_CHECKSUMS_DIR}/source/intel/asm/*.c" + ) +endif() + +if (AWS_ARCH_ARM64) + file(GLOB AWS_CHECKSUMS_ARCH_SRC + "${AWS_CHECKSUMS_DIR}/source/arm/*.c" + ) + set_source_files_properties("${AWS_CHECKSUMS_DIR}/source/arm/crc32c_arm.c" PROPERTIES COMPILE_FLAGS -march=armv8-a+crc) +elseif (AWS_ARCH_ARM32) + if (AWS_ARM32_CRC) + file(GLOB AWS_CHECKSUMS_ARCH_SRC + "${AWS_CHECKSUMS_DIR}/source/arm/*.c" + "${AWS_CHECKSUMS_DIR}/source/arm/asm/*.c" + ) + set_source_files_properties(source/arm/crc32c_arm.c PROPERTIES COMPILE_FLAGS -march=armv8-a+crc) + endif() +endif() + +list(APPEND AWS_SOURCES ${AWS_CHECKSUMS_SRC} ${AWS_CHECKSUMS_ARCH_SRC}) +list(APPEND AWS_PRIVATE_INCLUDES "${AWS_CHECKSUMS_DIR}/include/") + + +# aws-c-io +file(GLOB AWS_IO_SRC + "${AWS_IO_DIR}/source/*.c" +) + +if (OS_LINUX) + file(GLOB AWS_IO_OS_SRC + "${AWS_IO_DIR}/source/linux/*.c" + "${AWS_IO_DIR}/source/posix/*.c" + ) +elseif (OS_DARWIN) + file(GLOB AWS_IO_OS_SRC + "${AWS_IO_DIR}/source/bsd/*.c" + "${AWS_IO_DIR}/source/posix/*.c" + ) +endif() + +set(AWS_IO_TLS_SRC) +if (USE_S2N) + file(GLOB AWS_IO_TLS_SRC + "${AWS_IO_DIR}/source/s2n/*.c" + ) +endif() + +list(APPEND AWS_SOURCES ${AWS_IO_SRC} ${AWS_IO_OS_SRC} ${AWS_IO_TLS_SRC}) +list(APPEND AWS_PUBLIC_INCLUDES "${AWS_IO_DIR}/include/") + + +# aws-s2n-tls +if (USE_S2N) + file(GLOB AWS_S2N_TLS_SRC + "${AWS_S2N_TLS_DIR}/crypto/*.c" + "${AWS_S2N_TLS_DIR}/error/*.c" + "${AWS_S2N_TLS_DIR}/stuffer/*.c" + "${AWS_S2N_TLS_DIR}/pq-crypto/*.c" + "${AWS_S2N_TLS_DIR}/pq-crypto/kyber_r3/*.c" + "${AWS_S2N_TLS_DIR}/tls/*.c" + "${AWS_S2N_TLS_DIR}/tls/extensions/*.c" + "${AWS_S2N_TLS_DIR}/utils/*.c" + ) + + list(APPEND AWS_SOURCES ${AWS_S2N_TLS_SRC}) + + list(APPEND AWS_PRIVATE_INCLUDES + "${AWS_S2N_TLS_DIR}/" + "${AWS_S2N_TLS_DIR}/api/" + ) +endif() + + +# aws-crt-cpp +file(GLOB AWS_CRT_SRC + "${AWS_CRT_DIR}/source/*.cpp" + "${AWS_CRT_DIR}/source/auth/*.cpp" + "${AWS_CRT_DIR}/source/crypto/*.cpp" + "${AWS_CRT_DIR}/source/endpoints/*.cpp" + "${AWS_CRT_DIR}/source/external/*.cpp" + "${AWS_CRT_DIR}/source/http/*.cpp" + "${AWS_CRT_DIR}/source/io/*.cpp" +) + +list(APPEND AWS_SOURCES ${AWS_CRT_SRC}) +list(APPEND AWS_PUBLIC_INCLUDES "${AWS_CRT_DIR}/include/") + + +# aws-c-mqtt +file(GLOB AWS_MQTT_SRC + "${AWS_MQTT_DIR}/source/*.c" +) + +list(APPEND AWS_SOURCES ${AWS_MQTT_SRC}) +list(APPEND AWS_PUBLIC_INCLUDES "${AWS_MQTT_DIR}/include/") + + +# aws-c-http +file(GLOB AWS_HTTP_SRC + "${AWS_HTTP_DIR}/source/*.c" +) + +list(APPEND AWS_SOURCES ${AWS_HTTP_SRC}) +list(APPEND AWS_PRIVATE_INCLUDES "${AWS_HTTP_DIR}/include/") + + +# aws-c-compression +file(GLOB AWS_COMPRESSION_SRC + "${AWS_COMPRESSION_DIR}/source/*.c" +) + +list(APPEND AWS_SOURCES ${AWS_COMPRESSION_SRC}) +list(APPEND AWS_PRIVATE_INCLUDES "${AWS_COMPRESSION_DIR}/include/") + + +# aws-c-s3 +file(GLOB AWS_S3_SRC + "${AWS_S3_DIR}/source/*.c" +) + +list(APPEND AWS_SOURCES ${AWS_S3_SRC}) +list(APPEND AWS_PRIVATE_INCLUDES "${AWS_S3_DIR}/include/") + + +# aws-c-sdkutils +file(GLOB AWS_SDKUTILS_SRC + "${AWS_SDKUTILS_DIR}/source/*.c" +) + +list(APPEND AWS_SOURCES ${AWS_SDKUTILS_SRC}) +list(APPEND AWS_PUBLIC_INCLUDES "${AWS_SDKUTILS_DIR}/include/") + + +# Add library. +add_library(_aws ${AWS_SOURCES}) + +target_include_directories(_aws SYSTEM BEFORE PUBLIC ${AWS_PUBLIC_INCLUDES}) +target_include_directories(_aws SYSTEM BEFORE PRIVATE ${AWS_PRIVATE_INCLUDES}) +target_compile_definitions(_aws PUBLIC ${AWS_PUBLIC_COMPILE_DEFS}) +target_compile_definitions(_aws PRIVATE ${AWS_PRIVATE_COMPILE_DEFS}) +target_link_libraries(_aws PRIVATE ${AWS_PRIVATE_LIBS}) + +aws_set_thread_affinity_method(_aws) +aws_set_thread_name_method(_aws) + +# The library is large - avoid bloat. +if (OMIT_HEAVY_DEBUG_SYMBOLS) + target_compile_options (_aws PRIVATE -g0) +endif() + +add_library(tiflash_contrib::aws_s3 ALIAS _aws) diff --git a/contrib/aws-crt-cpp b/contrib/aws-crt-cpp new file mode 160000 index 00000000000..7ff9e0343c9 --- /dev/null +++ b/contrib/aws-crt-cpp @@ -0,0 +1 @@ +Subproject commit 7ff9e0343c978fc54f440b98147c2f72d304f6d8 diff --git a/contrib/aws-s2n-tls b/contrib/aws-s2n-tls new file mode 160000 index 00000000000..0725d3c0bb5 --- /dev/null +++ b/contrib/aws-s2n-tls @@ -0,0 +1 @@ +Subproject commit 0725d3c0bb5bc1383310e19dd94c821a9234d299 diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 6e737559de2..628c47fc4b5 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -103,6 +103,7 @@ add_headers_and_sources(dbms src/Storages/Page/V3/PageDirectory) add_headers_and_sources(dbms src/Storages/Page/V3/Blob) add_headers_and_sources(dbms src/Storages/Page/V3/Universal) add_headers_and_sources(dbms src/Storages/Page/) +add_headers_and_sources(dbms src/Storages/S3) add_headers_and_sources(dbms src/TiDB) add_headers_and_sources(dbms src/Client) add_headers_only(dbms src/Flash/Coprocessor) @@ -225,6 +226,7 @@ target_link_libraries (dbms ${OPENSSL_CRYPTO_LIBRARY} ${BTRIE_LIBRARIES} absl::synchronization + tiflash_contrib::aws_s3 ) # always add GmSSL include dir to the include path for static analysis diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 4c3c2532c6a..e1e53993a43 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -425,6 +425,8 @@ extern const int PTHREAD_ERROR = 10014; extern const int PS_ENTRY_NOT_EXISTS = 10015; extern const int PS_ENTRY_NO_VALID_VERSION = 10016; extern const int PS_DIR_APPLY_INVALID_STATUS = 10017; + +extern const int S3_ERROR = 11000; } // namespace ErrorCodes } // namespace DB diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index 5944d83fafb..c99d8a53f96 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -112,7 +112,16 @@ \ M(ChecksumDigestBytes) \ \ - M(RaftWaitIndexTimeout) + M(RaftWaitIndexTimeout) \ + \ + M(S3WriteBytes) \ + M(S3ReadBytes) \ + M(S3CreateMultipartUpload) \ + M(S3UploadPart) \ + M(S3CompleteMultipartUpload) \ + M(S3PutObject) \ + M(S3GetObject) \ + M(S3HeadObject) namespace ProfileEvents { diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 54d71e322e7..98f368d4005 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -74,6 +74,7 @@ #include #include #include +#include #include #include #include @@ -946,6 +947,11 @@ int Server::main(const std::vector & /*args*/) TiFlashStorageConfig storage_config; std::tie(global_capacity_quota, storage_config) = TiFlashStorageConfig::parseSettings(config(), log); + if (storage_config.s3_config.isS3Enabled()) + { + S3::ClientFactory::instance().init(storage_config.s3_config); + } + if (storage_config.format_version) { setStorageFormat(storage_config.format_version); @@ -1255,6 +1261,10 @@ int Server::main(const std::vector & /*args*/) // `Context::shutdown()` will destroy `DeltaIndexManager`. // So, stop threads explicitly before `TiFlashTestEnv::shutdown()`. DB::DM::SegmentReaderPoolManager::instance().stop(); + if (storage_config.s3_config.isS3Enabled()) + { + S3::ClientFactory::instance().shutdown(); + } global_context->shutdown(); LOG_DEBUG(log, "Shutted down storages."); }); diff --git a/dbms/src/Server/StorageConfigParser.cpp b/dbms/src/Server/StorageConfigParser.cpp index f7880a58339..32b4ed6c88b 100644 --- a/dbms/src/Server/StorageConfigParser.cpp +++ b/dbms/src/Server/StorageConfigParser.cpp @@ -13,6 +13,8 @@ // limitations under the License. /// Suppress gcc warning: ‘*((void*)& +4)’ may be used uninitialized in this function +#include +#include #if !__clang__ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" @@ -61,6 +63,15 @@ static String getNormalizedPath(const String & s) return getCanonicalPath(Poco::Path{s}.toString()); } +template +void readConfig(const std::shared_ptr & table, const String & name, T & value) +{ + if (auto p = table->get_qualified_as::type>(name); p) + { + value = *p; + } +} + void TiFlashStorageConfig::parseStoragePath(const String & storage, const LoggerPtr & log) { std::istringstream ss(storage); @@ -376,6 +387,11 @@ std::tuple TiFlashStorageConfig::parseSettings(Poc } } + if (config.has("storage.s3")) + { + storage_config.s3_config.parse(config.getString("storage.s3"), log); + } + return std::make_tuple(global_capacity_quota, storage_config); } @@ -385,26 +401,19 @@ void StorageIORateLimitConfig::parse(const String & storage_io_rate_limit, const cpptoml::parser p(ss); auto config = p.parse(); - auto read_config = [&](const std::string & name, auto & value) { - if (auto p = config->get_qualified_as::type>(name); p) - { - value = *p; - } - }; - - read_config("max_bytes_per_sec", max_bytes_per_sec); - read_config("max_read_bytes_per_sec", max_read_bytes_per_sec); - read_config("max_write_bytes_per_sec", max_write_bytes_per_sec); - read_config("foreground_write_weight", fg_write_weight); - read_config("background_write_weight", bg_write_weight); - read_config("foreground_read_weight", fg_read_weight); - read_config("background_read_weight", bg_read_weight); - read_config("emergency_pct", emergency_pct); - read_config("high_pct", high_pct); - read_config("medium_pct", medium_pct); - read_config("tune_base", tune_base); - read_config("min_bytes_per_sec", min_bytes_per_sec); - read_config("auto_tune_sec", auto_tune_sec); + readConfig(config, "max_bytes_per_sec", max_bytes_per_sec); + readConfig(config, "max_read_bytes_per_sec", max_read_bytes_per_sec); + readConfig(config, "max_write_bytes_per_sec", max_write_bytes_per_sec); + readConfig(config, "foreground_write_weight", fg_write_weight); + readConfig(config, "background_write_weight", bg_write_weight); + readConfig(config, "foreground_read_weight", fg_read_weight); + readConfig(config, "background_read_weight", bg_read_weight); + readConfig(config, "emergency_pct", emergency_pct); + readConfig(config, "high_pct", high_pct); + readConfig(config, "medium_pct", medium_pct); + readConfig(config, "tune_base", tune_base); + readConfig(config, "min_bytes_per_sec", min_bytes_per_sec); + readConfig(config, "auto_tune_sec", auto_tune_sec); use_max_bytes_per_sec = (max_read_bytes_per_sec == 0 && max_write_bytes_per_sec == 0); @@ -511,4 +520,26 @@ bool StorageIORateLimitConfig::operator==(const StorageIORateLimitConfig & confi && config.emergency_pct == emergency_pct && config.high_pct == high_pct && config.medium_pct == medium_pct && config.tune_base == tune_base && config.min_bytes_per_sec == min_bytes_per_sec && config.auto_tune_sec == auto_tune_sec; } + +void StorageS3Config::parse(const String & content, const LoggerPtr & log) +{ + std::istringstream ss(content); + cpptoml::parser p(ss); + auto table = p.parse(); + + readConfig(table, "endpoint", endpoint); + readConfig(table, "bucket", bucket); + + access_key_id = Poco::Environment::get("AWS_ACCESS_KEY_ID", /*default*/ ""); + secret_access_key = Poco::Environment::get("AWS_SECRET_ACCESS_KEY", /*default*/ ""); + + LOG_INFO(log, "endpoint={} bucket={} isS3Enabled={}", endpoint, bucket, isS3Enabled()); +} + +bool StorageS3Config::isS3Enabled() const +{ + return !endpoint.empty() && !bucket.empty() && !access_key_id.empty() && !secret_access_key.empty(); +} + + } // namespace DB diff --git a/dbms/src/Server/StorageConfigParser.h b/dbms/src/Server/StorageConfigParser.h index f3a779d2f63..32973643447 100644 --- a/dbms/src/Server/StorageConfigParser.h +++ b/dbms/src/Server/StorageConfigParser.h @@ -93,6 +93,17 @@ struct StorageIORateLimitConfig bool operator==(const StorageIORateLimitConfig & config) const; }; +struct StorageS3Config +{ + String endpoint; + String bucket; + String access_key_id; + String secret_access_key; + + void parse(const String & content, const LoggerPtr & log); + bool isS3Enabled() const; +}; + struct TiFlashStorageConfig { public: @@ -105,6 +116,8 @@ struct TiFlashStorageConfig UInt64 format_version = 0; bool lazily_init_store = true; + StorageS3Config s3_config; + public: TiFlashStorageConfig() = default; diff --git a/dbms/src/Server/tests/gtest_storage_config.cpp b/dbms/src/Server/tests/gtest_storage_config.cpp index 7d1dcc2cb28..4452bd3faf2 100644 --- a/dbms/src/Server/tests/gtest_storage_config.cpp +++ b/dbms/src/Server/tests/gtest_storage_config.cpp @@ -13,6 +13,7 @@ // limitations under the License. /// Suppress gcc warning: ‘*((void*)& +4)’ may be used uninitialized in this function +#include #if !__clang__ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" @@ -716,5 +717,74 @@ background_read_weight=2 } } CATCH + +std::pair getS3Env() +{ + return {Poco::Environment::get("AWS_ACCESS_KEY_ID", /*default*/ ""), + Poco::Environment::get("AWS_SECRET_ACCESS_KEY", /*default*/ "")}; +} + +void setS3Env(const String & id, const String & key) +{ + Poco::Environment::set("AWS_ACCESS_KEY_ID", id); + Poco::Environment::set("AWS_SECRET_ACCESS_KEY", key); +} + +TEST_F(StorageConfigTest, S3Config) +try +{ + Strings tests = { + R"( +[storage] +[storage.main] +dir = ["123"] +[storage.s3] + )", + R"( +[storage] +[storage.main] +dir = ["123"] +[storage.s3] +endpoint = "127.0.0.1:8080" +bucket = "s3_bucket" + )", + }; + + auto id_key = getS3Env(); + SCOPE_EXIT({ + setS3Env(id_key.first, id_key.second); + }); + for (size_t i = 0; i < tests.size(); ++i) + { + const auto & test_case = tests[i]; + auto config = loadConfigFromString(test_case); + LOG_INFO(log, "parsing [index={}] [content={}]", i, test_case); + const String test_id{"abcdefgh"}; + const String test_key{"1234567890"}; + setS3Env(test_id, test_key); + auto [global_capacity_quota, storage] = TiFlashStorageConfig::parseSettings(*config, log); + const auto & s3_config = storage.s3_config; + ASSERT_EQ(s3_config.access_key_id, test_id); + ASSERT_EQ(s3_config.secret_access_key, test_key); + if (i == 0) + { + ASSERT_TRUE(s3_config.endpoint.empty()); + ASSERT_TRUE(s3_config.bucket.empty()); + ASSERT_FALSE(s3_config.isS3Enabled()); + } + else if (i == 1) + { + ASSERT_EQ(s3_config.endpoint, "127.0.0.1:8080"); + ASSERT_EQ(s3_config.bucket, "s3_bucket"); + ASSERT_TRUE(s3_config.isS3Enabled()); + } + else + { + throw Exception("Not support"); + } + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Storages/S3/S3Common.cpp b/dbms/src/Storages/S3/S3Common.cpp new file mode 100644 index 00000000000..25decfefc21 --- /dev/null +++ b/dbms/src/Storages/S3/S3Common.cpp @@ -0,0 +1,293 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace ProfileEvents +{ +extern const Event S3HeadObject; +} + +namespace DB::ErrorCodes +{ +extern const int S3_ERROR; +} + +namespace +{ + +Poco::Message::Priority convertLogLevel(Aws::Utils::Logging::LogLevel log_level) +{ + switch (log_level) + { + case Aws::Utils::Logging::LogLevel::Off: + case Aws::Utils::Logging::LogLevel::Fatal: + case Aws::Utils::Logging::LogLevel::Error: + return Poco::Message::PRIO_ERROR; + case Aws::Utils::Logging::LogLevel::Warn: + return Poco::Message::PRIO_WARNING; + case Aws::Utils::Logging::LogLevel::Info: + return Poco::Message::PRIO_INFORMATION; + case Aws::Utils::Logging::LogLevel::Debug: + return Poco::Message::PRIO_DEBUG; + case Aws::Utils::Logging::LogLevel::Trace: + return Poco::Message::PRIO_TRACE; + default: + return Poco::Message::PRIO_INFORMATION; + } +} + +class AWSLogger final : public Aws::Utils::Logging::LogSystemInterface +{ +public: + AWSLogger() + : default_logger(&Poco::Logger::get("AWSClient")) + {} + + ~AWSLogger() final = default; + + Aws::Utils::Logging::LogLevel GetLogLevel() const final + { + return Aws::Utils::Logging::LogLevel::Info; + } + + void Log(Aws::Utils::Logging::LogLevel log_level, const char * tag, const char * format_str, ...) final // NOLINT + { + callLogImpl(log_level, tag, format_str); + } + + void LogStream(Aws::Utils::Logging::LogLevel log_level, const char * tag, const Aws::OStringStream & message_stream) final + { + callLogImpl(log_level, tag, message_stream.str().c_str()); + } + + void callLogImpl(Aws::Utils::Logging::LogLevel log_level, const char * tag, const char * message) + { + auto prio = convertLogLevel(log_level); + LOG_IMPL(default_logger, prio, "tag={} message={}", tag, message); + } + + void Flush() final {} + +private: + Poco::Logger * default_logger; +}; + +} // namespace + + +namespace DB +{ +namespace S3 +{ +void ClientFactory::init(const StorageS3Config & config_) +{ + config = config_; + Aws::InitAPI(aws_options); + Aws::Utils::Logging::InitializeAWSLogging(std::make_shared()); +} + +void ClientFactory::shutdown() +{ + Aws::Utils::Logging::ShutdownAWSLogging(); + Aws::ShutdownAPI(aws_options); +} + +ClientFactory::~ClientFactory() = default; + +ClientFactory & ClientFactory::instance() +{ + static ClientFactory ret; + return ret; +} + +std::unique_ptr ClientFactory::create() const +{ + auto schema = parseScheme(config.endpoint); + return create( + config.endpoint, + schema, + schema == Aws::Http::Scheme::HTTPS, + config.access_key_id, + config.secret_access_key); +} + +std::unique_ptr ClientFactory::create( + const String & endpoint, + Aws::Http::Scheme scheme, + bool verifySSL, + const String & access_key_id, + const String & secret_access_key) +{ + Aws::Client::ClientConfiguration cfg; + cfg.endpointOverride = endpoint; + cfg.scheme = scheme; + cfg.verifySSL = verifySSL; + Aws::Auth::AWSCredentials cred(access_key_id, secret_access_key); + return std::make_unique( + cred, + cfg, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + /*useVirtualAddressing*/ true); +} + +Aws::Http::Scheme ClientFactory::parseScheme(std::string_view endpoint) +{ + return boost::algorithm::starts_with(endpoint, "https://") ? Aws::Http::Scheme::HTTPS : Aws::Http::Scheme::HTTP; +} + +bool isNotFoundError(Aws::S3::S3Errors error) +{ + return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY; +} + +Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id) +{ + ProfileEvents::increment(ProfileEvents::S3HeadObject); + Aws::S3::Model::HeadObjectRequest req; + req.SetBucket(bucket); + req.SetKey(key); + if (!version_id.empty()) + { + req.SetVersionId(version_id); + } + return client.HeadObject(req); +} + +S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error) +{ + auto outcome = headObject(client, bucket, key, version_id); + + if (outcome.IsSuccess()) + { + auto read_result = outcome.GetResultWithOwnership(); + return {.size = static_cast(read_result.GetContentLength()), .last_modification_time = read_result.GetLastModified().Millis() / 1000}; + } + else if (throw_on_error) + { + const auto & error = outcome.GetError(); + throw DB::Exception(ErrorCodes::S3_ERROR, + "Failed to HEAD object: {}. HTTP response code: {}", + error.GetMessage(), + static_cast(error.GetResponseCode())); + } + return {}; +} + +size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error) +{ + return getObjectInfo(client, bucket, key, version_id, throw_on_error).size; +} + +bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id) +{ + auto outcome = headObject(client, bucket, key, version_id); + if (outcome.IsSuccess()) + { + return true; + } + const auto & error = outcome.GetError(); + if (isNotFoundError(error.GetErrorType())) + { + return false; + } + throw Exception(ErrorCodes::S3_ERROR, + "Failed to check existence of key {} in bucket {}: {}", + key, + bucket, + error.GetMessage()); +} + +void uploadFile(const Aws::S3::S3Client & client, const String & bucket, const String & local_fname, const String & remote_fname) +{ + Stopwatch sw; + Aws::S3::Model::PutObjectRequest req; + req.SetBucket(bucket); + req.SetKey(remote_fname); + auto istr = Aws::MakeShared("PutObjectInputStream", local_fname, std::ios_base::in | std::ios_base::binary); + req.SetBody(istr); + req.SetContentType("binary/octet-stream"); + auto result = client.PutObject(req); + RUNTIME_CHECK_MSG(result.IsSuccess(), + "S3 PutObject failed, local_fname={}, remote_fname={}, exception={}, message={}", + local_fname, + remote_fname, + result.GetError().GetExceptionName(), + result.GetError().GetMessage()); + static auto * log = &Poco::Logger::get("S3PutObject"); + LOG_DEBUG(log, "local_fname={}, remote_fname={}, cost={}ms", local_fname, remote_fname, sw.elapsedMilliseconds()); +} + +void downloadFile(const Aws::S3::S3Client & client, const String & bucket, const String & local_fname, const String & remote_fname) +{ + Stopwatch sw; + Aws::S3::Model::GetObjectRequest req; + req.SetBucket(bucket); + req.SetKey(remote_fname); + auto result = client.GetObject(req); + RUNTIME_CHECK_MSG(result.IsSuccess(), + "S3 GetObject failed, local_fname={}, remote_fname={}, exception={}, message={}", + local_fname, + remote_fname, + result.GetError().GetExceptionName(), + result.GetError().GetMessage()); + Aws::OFStream ostr(local_fname, std::ios_base::out | std::ios_base::binary); + ostr << result.GetResult().GetBody().rdbuf(); + static auto * log = &Poco::Logger::get("S3GetObject"); + LOG_DEBUG(log, "local_fname={}, remote_fname={}, cost={}ms", local_fname, remote_fname, sw.elapsedMilliseconds()); +} + +std::unordered_map listPrefix(const Aws::S3::S3Client & client, const String & bucket, const String & prefix) +{ + Stopwatch sw; + Aws::S3::Model::ListObjectsRequest req; + req.SetBucket(bucket); + req.SetPrefix(prefix); + auto result = client.ListObjects(req); + RUNTIME_CHECK_MSG(result.IsSuccess(), + "S3 ListObjects failed, prefix={}, exception={}, message={}", + prefix, + result.GetError().GetExceptionName(), + result.GetError().GetMessage()); + const auto & objects = result.GetResult().GetContents(); + std::unordered_map keys_with_size; + keys_with_size.reserve(objects.size()); + for (const auto & object : objects) + { + keys_with_size.emplace(object.GetKey().substr(prefix.size()), object.GetSize()); // Cut prefix + } + static auto * log = &Poco::Logger::get("S3ListObjects"); + LOG_DEBUG(log, "prefix={}, keys={}, cost={}", prefix, keys_with_size, sw.elapsedMilliseconds()); + return keys_with_size; +} + +} // namespace S3 + +} // namespace DB diff --git a/dbms/src/Storages/S3/S3Common.h b/dbms/src/Storages/S3/S3Common.h new file mode 100644 index 00000000000..b4d2bb824ea --- /dev/null +++ b/dbms/src/Storages/S3/S3Common.h @@ -0,0 +1,77 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB::S3 +{ + +class ClientFactory +{ +public: + ~ClientFactory(); + + static ClientFactory & instance(); + + void init(const StorageS3Config & config_); + void shutdown(); + std::unique_ptr create() const; + + static std::unique_ptr create( + const String & endpoint, + Aws::Http::Scheme scheme, + bool verifySSL, + const String & access_key_id, + const String & secret_access_key); + + static Aws::Http::Scheme parseScheme(std::string_view endpoint); + +private: + ClientFactory() = default; + DISALLOW_COPY_AND_MOVE(ClientFactory); + + Aws::SDKOptions aws_options; + StorageS3Config config; +}; + +struct ObjectInfo +{ + size_t size = 0; + time_t last_modification_time = 0; +}; + +bool isNotFoundError(Aws::S3::S3Errors error); + +Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = ""); + +S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error); + +size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error); + +bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = ""); + +void uploadFile(const Aws::S3::S3Client & client, const String & bucket, const String & local_fname, const String & remote_fname); + +void downloadFile(const Aws::S3::S3Client & client, const String & bucket, const String & local_fname, const String & remote_fname); + +std::unordered_map listPrefix(const Aws::S3::S3Client & client, const String & bucket, const String & prefix); +} // namespace DB::S3 \ No newline at end of file