From 627fd1fa7deae195d39ca2dc712673fcce6d42e6 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Fri, 6 Sep 2024 22:14:30 +0800 Subject: [PATCH] Compression: Support enabling lightweight compression (#9312) close pingcap/tiflash#8982, close pingcap/tiflash#9336 Compression: Support enabling lightweight compression Signed-off-by: Lloyd-Pottiger Co-authored-by: jinhelin --- .clangd | 4 +- dbms/src/Common/TiFlashMetrics.h | 18 +- .../MergeSortingBlockInputStream.cpp | 1 - .../Compression/CompressionCodecDeltaFOR.cpp | 16 +- .../IO/Compression/CompressionCodecFOR.cpp | 9 +- .../Compression/CompressionCodecFactory.cpp | 11 +- .../IO/Compression/CompressionCodecLZ4.cpp | 8 - dbms/src/IO/Compression/CompressionCodecLZ4.h | 6 + .../CompressionCodecLightweight.cpp | 12 +- .../Compression/CompressionCodecLightweight.h | 77 ++-- .../CompressionCodecLightweight_Integer.cpp | 212 ++++++----- .../Compression/CompressionCodecRunLength.cpp | 20 +- .../src/IO/Compression/CompressionCodecZSTD.h | 4 + .../IO/Compression/CompressionSettings.cpp | 34 +- dbms/src/IO/Compression/CompressionSettings.h | 15 +- dbms/src/IO/Compression/EncodingUtil.cpp | 356 +++++++++++++++--- dbms/src/IO/Compression/EncodingUtil.h | 150 ++------ dbms/src/IO/Compression/tests/bench_codec.cpp | 41 ++ .../tests/gtest_codec_compression.cpp | 1 - dbms/src/Interpreters/SettingsCommon.h | 75 ++-- dbms/src/Server/tests/gtest_server_config.cpp | 18 + .../ColumnFile/ColumnFilePersisted.cpp | 6 +- .../Storages/DeltaMerge/File/DMFileWriter.h | 24 +- .../DeltaMerge/tests/gtest_dm_file.cpp | 27 +- metrics/grafana/tiflash_summary.json | 230 ++++++++++- .../tiflash_dt_lightweight_compression.toml | 50 +++ .../tiflash-dt-lightweight-compression.yaml | 43 +++ tests/tidb-ci/lightweight_compression | 1 + tests/tidb-ci/run.sh | 9 + .../tiflash-dt-lightweight-compression.yaml | 1 + 30 files changed, 1051 insertions(+), 428 deletions(-) create mode 100644 tests/docker/config/tiflash_dt_lightweight_compression.toml create mode 100644 tests/docker/tiflash-dt-lightweight-compression.yaml create mode 120000 tests/tidb-ci/lightweight_compression create mode 120000 tests/tidb-ci/tiflash-dt-lightweight-compression.yaml diff --git a/.clangd b/.clangd index ef516ada329..18cca3ed557 100644 --- a/.clangd +++ b/.clangd @@ -1,2 +1,4 @@ CompileFlags: - Add: -ferror-limit=0 + Add: + - -ferror-limit=0 + - -Wno-vla-cxx-extension diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 4a5bf82e124..7fa1207cba6 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -868,7 +868,23 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva "Durations of read thread internal components", \ Histogram, \ F(type_block_queue_pop_latency, {{"type", "block_queue_pop_latency"}}, ExpBuckets{1, 2, 20}), \ - F(type_schedule_one_round, {{"type", "schedule_one_round"}}, ExpBuckets{1, 2, 20})) + F(type_schedule_one_round, {{"type", "schedule_one_round"}}, ExpBuckets{1, 2, 20})) \ + M(tiflash_storage_pack_compression_algorithm_count, \ + "The count of the compression algorithm used by each data part", \ + Counter, \ + F(type_constant, {"type", "constant"}), \ + F(type_constant_delta, {"type", "constant_delta"}), \ + F(type_runlength, {"type", "runlength"}), \ + F(type_for, {"type", "for"}), \ + F(type_delta_for, {"type", "delta_for"}), \ + F(type_lz4, {"type", "lz4"})) \ + M(tiflash_storage_pack_compression_bytes, \ + "The uncompression/compression bytes of lz4 and lightweight", \ + Counter, \ + F(type_lz4_compressed_bytes, {"type", "lz4_compressed_bytes"}), \ + F(type_lz4_uncompressed_bytes, {"type", "lz4_uncompressed_bytes"}), \ + F(type_lightweight_compressed_bytes, {"type", "lightweight_compressed_bytes"}), \ + F(type_lightweight_uncompressed_bytes, {"type", "lightweight_uncompressed_bytes"})) /// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)] diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 84073ff1376..ca8418faf64 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp b/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp index 8ec7455927d..4e14bbcd159 100644 --- a/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp +++ b/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include @@ -60,17 +59,16 @@ UInt32 compressData(const char * source, UInt32 source_size, char * dest) if unlikely (source_size % bytes_size != 0) throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); const auto count = source_size / bytes_size; - DB::Compression::deltaEncoding(reinterpret_cast(source), count, reinterpret_cast(dest)); if (unlikely(count == 1)) return bytes_size; - // Cast deltas to signed type to better compress negative values. - // For example, if we have a sequence of UInt8 values [3, 2, 1, 0], the deltas will be [3, -1, -1, -1] - // If we compress them as UInt8, we will get [3, 255, 255, 255], which is not optimal. using TS = typename std::make_signed::type; - auto for_size = DB::CompressionCodecFOR::compressData( - reinterpret_cast(dest + bytes_size), - source_size - bytes_size, - dest + bytes_size); + // view source as signed integers so that delta will be smaller + DB::Compression::deltaEncoding(reinterpret_cast(source), count, reinterpret_cast(dest)); + // do ZigZag encoding on deltas to make deltas as positive integers + auto * deltas = reinterpret_cast(dest + bytes_size); + DB::Compression::zigZagEncoding(deltas, count - 1, deltas); + // compress deltas using FOR + auto for_size = DB::CompressionCodecFOR::compressData(deltas, source_size - bytes_size, dest + bytes_size); return bytes_size + for_size; } diff --git a/dbms/src/IO/Compression/CompressionCodecFOR.cpp b/dbms/src/IO/Compression/CompressionCodecFOR.cpp index eec50acb4f8..4c6e0730cdf 100644 --- a/dbms/src/IO/Compression/CompressionCodecFOR.cpp +++ b/dbms/src/IO/Compression/CompressionCodecFOR.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include @@ -60,9 +59,11 @@ UInt32 CompressionCodecFOR::compressData(const T * source, UInt32 source_size, c if unlikely (count == 0) throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress empty data"); std::vector values(source, source + count); - T frame_of_reference = *std::min_element(values.cbegin(), values.cend()); - UInt8 width = DB::Compression::FOREncodingWidth(values, frame_of_reference); - return DB::Compression::FOREncoding>(values, frame_of_reference, width, dest); + auto minmax = std::minmax_element(values.cbegin(), values.cend()); + T frame_of_reference = *minmax.first; + T max_value = *minmax.second; + UInt8 width = BitpackingPrimitives::minimumBitWidth(max_value - frame_of_reference); + return DB::Compression::FOREncoding(values.data(), values.size(), frame_of_reference, width, dest); } UInt32 CompressionCodecFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const diff --git a/dbms/src/IO/Compression/CompressionCodecFactory.cpp b/dbms/src/IO/Compression/CompressionCodecFactory.cpp index e0aec4619cd..85bae075aa6 100644 --- a/dbms/src/IO/Compression/CompressionCodecFactory.cpp +++ b/dbms/src/IO/Compression/CompressionCodecFactory.cpp @@ -25,6 +25,7 @@ #include #include + #if USE_QPL #include #endif @@ -77,7 +78,6 @@ template CompressionCodecPtr CompressionCodecFactory::getStaticCodec( const CompressionSetting & setting); - template <> CompressionCodecPtr CompressionCodecFactory::getStaticCodec(const CompressionSetting & setting) { @@ -154,7 +154,6 @@ CompressionCodecPtr CompressionCodecFactory::getStaticCodec CompressionCodecPtr CompressionCodecFactory::create(const CompressionSetting & setting) { @@ -182,7 +181,13 @@ CompressionCodecPtr CompressionCodecFactory::create(const CompressionSetting & s if (!isInteger(setting.data_type)) { if (setting.method_byte == CompressionMethodByte::Lightweight) + { + // Use LZ4 codec for non-integral types + // TODO: maybe we can use zstd? + auto method = CompressionMethod::LZ4; + CompressionSetting setting(method, CompressionSetting::getDefaultLevel(method)); return getStaticCodec(setting); + } else return nullptr; } @@ -192,7 +197,7 @@ CompressionCodecPtr CompressionCodecFactory::create(const CompressionSetting & s switch (setting.method_byte) { case CompressionMethodByte::Lightweight: - return std::make_unique(setting.data_type); + return std::make_unique(setting.data_type, setting.level); case CompressionMethodByte::DeltaFOR: return getStaticCodec(setting); case CompressionMethodByte::RunLength: diff --git a/dbms/src/IO/Compression/CompressionCodecLZ4.cpp b/dbms/src/IO/Compression/CompressionCodecLZ4.cpp index 5f0aa8719ae..3027d276c3b 100644 --- a/dbms/src/IO/Compression/CompressionCodecLZ4.cpp +++ b/dbms/src/IO/Compression/CompressionCodecLZ4.cpp @@ -25,8 +25,6 @@ namespace ErrorCodes { extern const int CANNOT_COMPRESS; extern const int CANNOT_DECOMPRESS; -extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE; -extern const int ILLEGAL_CODEC_PARAMETER; } // namespace ErrorCodes CompressionCodecLZ4::CompressionCodecLZ4(int level_) @@ -72,10 +70,4 @@ CompressionCodecLZ4HC::CompressionCodecLZ4HC(int level_) : CompressionCodecLZ4(level_) {} - -CompressionCodecPtr getCompressionCodecLZ4(int level) -{ - return std::make_unique(level); -} - } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecLZ4.h b/dbms/src/IO/Compression/CompressionCodecLZ4.h index 2ded3442d90..9423e867119 100644 --- a/dbms/src/IO/Compression/CompressionCodecLZ4.h +++ b/dbms/src/IO/Compression/CompressionCodecLZ4.h @@ -19,9 +19,14 @@ namespace DB { +class CompressionCodecFactory; + class CompressionCodecLZ4 : public ICompressionCodec { public: + // The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4 + static constexpr size_t ESTIMATE_INTEGER_COMPRESSION_RATIO = 3; + explicit CompressionCodecLZ4(int level_); UInt8 getMethodByte() const override; @@ -39,6 +44,7 @@ class CompressionCodecLZ4 : public ICompressionCodec protected: const int level; + friend class CompressionCodecFactory; }; diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight.cpp index 2f93158b076..ac297c17cac 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight.cpp +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.cpp @@ -23,7 +23,6 @@ namespace DB { -// TODO: metrics namespace ErrorCodes { @@ -31,8 +30,9 @@ extern const int CANNOT_COMPRESS; extern const int CANNOT_DECOMPRESS; } // namespace ErrorCodes -CompressionCodecLightweight::CompressionCodecLightweight(CompressionDataType data_type_) - : data_type(data_type_) +CompressionCodecLightweight::CompressionCodecLightweight(CompressionDataType data_type_, int level_) + : ctx(level_) + , data_type(data_type_) {} UInt8 CompressionCodecLightweight::getMethodByte() const @@ -46,12 +46,6 @@ UInt32 CompressionCodecLightweight::getMaxCompressedDataSize(UInt32 uncompressed return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size); } -CompressionCodecLightweight::~CompressionCodecLightweight() -{ - if (ctx.isCompression()) - LOG_INFO(Logger::get(), "lightweight codec: {}", ctx.toDebugString()); -} - UInt32 CompressionCodecLightweight::doCompressData(const char * source, UInt32 source_size, char * dest) const { dest[0] = magic_enum::enum_integer(data_type); diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight.h b/dbms/src/IO/Compression/CompressionCodecLightweight.h index 7734ded7bd1..1cc1cdfca14 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight.h +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.h @@ -34,13 +34,13 @@ namespace DB class CompressionCodecLightweight : public ICompressionCodec { public: - explicit CompressionCodecLightweight(CompressionDataType data_type_); + explicit CompressionCodecLightweight(CompressionDataType data_type_, int level_); UInt8 getMethodByte() const override; - bool isCompression() const override { return true; } + ~CompressionCodecLightweight() override = default; - ~CompressionCodecLightweight() override; + bool isCompression() const override { return true; } protected: UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; @@ -55,22 +55,19 @@ class CompressionCodecLightweight : public ICompressionCodec enum class IntegerMode : UInt8 { Invalid = 0, - CONSTANT = 1, // all values are the same - CONSTANT_DELTA = 2, // the difference between two adjacent values is the same - RunLength = 3, // run-length encoding + Constant = 1, // all values are the same + ConstantDelta = 2, // the difference between two adjacent values is the same + RunLength = 3, // the same value appears multiple times FOR = 4, // Frame of Reference encoding - DELTA_FOR = 5, // delta encoding and then FOR encoding + DeltaFOR = 5, // delta encoding and then FOR encoding LZ4 = 6, // the above modes are not suitable, use LZ4 instead }; // Constant or ConstantDelta - template + template using ConstantState = T; - template - using RunLengthState = std::vector>; - - template + template struct FORState { std::vector values; @@ -78,63 +75,59 @@ class CompressionCodecLightweight : public ICompressionCodec UInt8 bit_width; }; - template + template struct DeltaFORState { - using TS = typename std::make_signed_t; - std::vector deltas; - TS min_delta_value; + std::vector deltas; + T min_delta_value; UInt8 bit_width; }; // State is a union of different states for different modes - template - using IntegerState = std::variant, RunLengthState, FORState, DeltaFORState>; + template + using IntegerState = std::variant, FORState, DeltaFORState>; class IntegerCompressContext { public: - IntegerCompressContext() = default; + explicit IntegerCompressContext(int round_count_) + : round_count(round_count_) + {} - template + template void analyze(std::span & values, IntegerState & state); void update(size_t uncompressed_size, size_t compressed_size); - String toDebugString() const; - bool isCompression() const { return lz4_counter > 0 || lw_counter > 0; } - IntegerMode mode = IntegerMode::LZ4; private: bool needAnalyze() const; + + template bool needAnalyzeDelta() const; + + template + static constexpr bool needAnalyzeFOR(); + bool needAnalyzeRunLength() const; + void resetIfNeed(); + private: - // The threshold for the number of blocks to decide whether need to analyze. - // For example: - // If lz4 is used more than COUNT_THRESHOLD times and the compression ratio is better than lightweight codec, do not analyze anymore. - static constexpr size_t COUNT_THRESHOLD = 5; - // Assume that the compression ratio of LZ4 is 3.0 - // The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4 - static constexpr size_t ESRTIMATE_LZ4_COMPRESSION_RATIO = 3; - - size_t lw_uncompressed_size = 0; - size_t lw_compressed_size = 0; - size_t lw_counter = 0; - size_t lz4_uncompressed_size = 0; - size_t lz4_compressed_size = 0; - size_t lz4_counter = 0; - size_t constant_delta_counter = 0; - size_t delta_for_counter = 0; - size_t rle_counter = 0; + // Every round_count blocks as a round, decide whether to analyze the mode. + const int round_count; + int compress_count = 0; + bool used_lz4 = false; + bool used_constant_delta = false; + bool used_delta_for = false; + bool used_rle = false; }; - template + template size_t compressDataForInteger(const char * source, UInt32 source_size, char * dest) const; - template + template void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const; /// Non-integer data diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp index 6ce2b51bbcc..2c83d395c1d 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp +++ b/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp @@ -13,11 +13,14 @@ // limitations under the License. #include +#include +#include #include #include #include #include + namespace DB { @@ -27,66 +30,92 @@ extern const int CANNOT_COMPRESS; extern const int CANNOT_DECOMPRESS; } // namespace ErrorCodes -String CompressionCodecLightweight::IntegerCompressContext::toDebugString() const -{ - return fmt::format( - "lz4: {}, lightweight: {}, constant_delta: {}, delta_for: {}, rle: {}, lz4 {} -> {}, lightweight {} -> {}", - lz4_counter, - lw_counter, - constant_delta_counter, - delta_for_counter, - rle_counter, - lz4_uncompressed_size, - lz4_compressed_size, - lw_uncompressed_size, - lw_compressed_size); -} - void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompressed_size, size_t compressed_size) { if (mode == IntegerMode::LZ4) { - lz4_uncompressed_size += uncompressed_size; - lz4_compressed_size += compressed_size; - ++lz4_counter; + GET_METRIC(tiflash_storage_pack_compression_bytes, type_lz4_uncompressed_bytes).Increment(uncompressed_size); + GET_METRIC(tiflash_storage_pack_compression_bytes, type_lz4_compressed_bytes).Increment(compressed_size); + GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_lz4).Increment(); + used_lz4 = true; } else { - lw_uncompressed_size += uncompressed_size; - lw_compressed_size += compressed_size; - ++lw_counter; + GET_METRIC(tiflash_storage_pack_compression_bytes, type_lightweight_uncompressed_bytes) + .Increment(uncompressed_size); + GET_METRIC(tiflash_storage_pack_compression_bytes, type_lightweight_compressed_bytes) + .Increment(compressed_size); + } + switch (mode) + { + case IntegerMode::Constant: + GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_constant).Increment(); + break; + case IntegerMode::ConstantDelta: + GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_constant_delta).Increment(); + used_constant_delta = true; + break; + case IntegerMode::RunLength: + GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_runlength).Increment(); + used_rle = true; + break; + case IntegerMode::FOR: + GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_for).Increment(); + break; + case IntegerMode::DeltaFOR: + GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_delta_for).Increment(); + used_delta_for = true; + break; + default: + break; + } + // Since analyze CONSTANT is extremely fast, so it will not be counted in the round. + if (mode != IntegerMode::Constant) + { + ++compress_count; + resetIfNeed(); } - if (mode == IntegerMode::CONSTANT_DELTA) - ++constant_delta_counter; - if (mode == IntegerMode::DELTA_FOR) - ++delta_for_counter; - if (mode == IntegerMode::RunLength) - ++rle_counter; } +// Every ROUND_COUNT times as a round. +// At the beginning of each round, analyze once. +// During the round, if once used lz4, do not analyze anymore, and use lz4 directly. bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze() const { - // lightweight codec is never used, do not analyze anymore - if (lz4_counter > COUNT_THRESHOLD && lw_counter == 0) - return false; - // if lz4 is used more than COUNT_THRESHOLD times and the compression ratio is better than lightweight codec, do not analyze anymore - if (lz4_counter > COUNT_THRESHOLD - && lz4_uncompressed_size / lz4_compressed_size > lw_uncompressed_size / lw_compressed_size) - return false; - return true; + return compress_count == 0 || !used_lz4; +} + +void CompressionCodecLightweight::IntegerCompressContext::resetIfNeed() +{ + if (compress_count >= round_count) + { + compress_count = 0; + used_lz4 = false; + used_constant_delta = false; + used_delta_for = false; + used_rle = false; + } } +template bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeDelta() const { - return lw_counter <= COUNT_THRESHOLD || constant_delta_counter != 0 || delta_for_counter != 0; + return !std::is_same_v && (compress_count == 0 || used_constant_delta || used_delta_for); +} + +template +constexpr bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeFOR() +{ + // The performance of FOR is not good for UInt16, so we do not use FOR for UInt16. + return !std::is_same_v; } bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeRunLength() const { - return lw_counter <= COUNT_THRESHOLD || rle_counter != 0; + return compress_count == 0 || used_rle; } -template +template void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span & values, IntegerState & state) { if (values.empty()) @@ -97,7 +126,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span; - std::vector deltas; + std::vector deltas; UInt8 delta_for_width = sizeof(T) * 8; - size_t delta_for_size = std::numeric_limits::max(); - TS min_delta = std::numeric_limits::min(); - if (needAnalyzeDelta()) + T min_delta = std::numeric_limits::max(); + if (needAnalyzeDelta()) { // Check CONSTANT_DELTA // If values.size() == 1, mode will be CONSTANT // so values.size() must be greater than 1 here and deltas must be non empty. - assert(values.size() > 1); deltas.reserve(values.size() - 1); for (size_t i = 1; i < values.size(); ++i) { - deltas.push_back(values[i] - values[i - 1]); + deltas.push_back(static_cast(values[i]) - static_cast(values[i - 1])); } - auto minmax_delta = std::minmax_element(deltas.cbegin(), deltas.cend()); - min_delta = *minmax_delta.first; - if (min_delta == *minmax_delta.second) + if (auto [min_delta, max_delta] = std::minmax_element(deltas.begin(), deltas.end()); *min_delta == *max_delta) { - state = static_cast(min_delta); - mode = IntegerMode::CONSTANT_DELTA; + state = static_cast(*min_delta); + mode = IntegerMode::ConstantDelta; return; } // DELTA_FOR - delta_for_width = Compression::FOREncodingWidth(deltas, min_delta); - // values[0], min_delta, 1 byte for width, and the rest for compressed data - static constexpr auto ADDTIONAL_BYTES = sizeof(T) + sizeof(UInt8) + sizeof(T); - delta_for_size = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + ADDTIONAL_BYTES; - } - - // RunLength - Compression::RunLengthPairs rle; - if (needAnalyzeRunLength()) - { - rle.reserve(values.size()); - rle.emplace_back(values[0], 1); - for (size_t i = 1; i < values.size(); ++i) + if constexpr (needAnalyzeFOR()) { - if (values[i] != values[i - 1] || rle.back().second == std::numeric_limits::max()) - rle.emplace_back(values[i], 1); - else - ++rle.back().second; + // Do ZigZag encoding for deltas + Compression::zigZagEncoding(deltas.data(), deltas.size(), deltas.data()); + auto minmax = std::minmax_element(deltas.begin(), deltas.end()); + min_delta = static_cast(*minmax.first); + delta_for_width = BitpackingPrimitives::minimumBitWidth(static_cast(*minmax.second) - min_delta); } } + // RunLength + size_t estimate_rle_size = Compression::runLengthEncodedApproximateSize(values.data(), values.size()); + + size_t estimate_lz_size = values.size() * sizeof(T) / CompressionCodecLZ4::ESTIMATE_INTEGER_COMPRESSION_RATIO; + UInt8 for_width = BitpackingPrimitives::minimumBitWidth(max_value - min_value); - // additional T bytes for min_delta, and 1 byte for width - static constexpr auto ADDTIONAL_BYTES = sizeof(T) + sizeof(UInt8); - size_t for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + ADDTIONAL_BYTES; - size_t estimate_lz_size = values.size() * sizeof(T) / ESRTIMATE_LZ4_COMPRESSION_RATIO; - size_t rle_size = rle.empty() ? std::numeric_limits::max() : Compression::runLengthPairsByteSize(rle); - if (needAnalyzeRunLength() && rle_size < delta_for_size && rle_size < for_size && rle_size < estimate_lz_size) + // min_delta, and 1 byte for width, and the rest for compressed data + static constexpr auto FOR_EXTRA_BYTES = sizeof(T) + sizeof(UInt8); + size_t for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + FOR_EXTRA_BYTES; + + // values[0], min_delta, 1 byte for width, and the rest for compressed data + static constexpr auto DFOR_EXTRA_BYTES = sizeof(T) + sizeof(UInt8) + sizeof(T); + size_t delta_for_size = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + DFOR_EXTRA_BYTES; + + if (needAnalyzeRunLength() && estimate_rle_size < delta_for_size && estimate_rle_size < for_size + && estimate_rle_size < estimate_lz_size) { - state = std::move(rle); mode = IntegerMode::RunLength; } - else if (for_size < delta_for_size && for_size < estimate_lz_size) + else if (needAnalyzeFOR() && for_size < delta_for_size && for_size < estimate_lz_size) { std::vector values_copy(values.begin(), values.end()); state = FORState{std::move(values_copy), min_value, for_width}; mode = IntegerMode::FOR; } - else if (needAnalyzeDelta() && delta_for_size < estimate_lz_size) + else if (needAnalyzeDelta() && delta_for_size < estimate_lz_size) { state = DeltaFORState{std::move(deltas), min_delta, delta_for_width}; - mode = IntegerMode::DELTA_FOR; + mode = IntegerMode::DeltaFOR; } else { @@ -188,7 +210,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span +template size_t CompressionCodecLightweight::compressDataForInteger(const char * source, UInt32 source_size, char * dest) const { const auto bytes_size = static_cast(data_type); @@ -214,35 +236,41 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source, size_t compressed_size = 1; switch (ctx.mode) { - case IntegerMode::CONSTANT: + case IntegerMode::Constant: { compressed_size += Compression::constantEncoding(std::get<0>(state), dest); break; } - case IntegerMode::CONSTANT_DELTA: + case IntegerMode::ConstantDelta: { compressed_size += Compression::constantDeltaEncoding(values[0], std::get<0>(state), dest); break; } case IntegerMode::RunLength: { - compressed_size += Compression::runLengthEncoding(std::get<1>(state), dest); + compressed_size += Compression::runLengthEncoding(values.data(), values.size(), dest); break; } case IntegerMode::FOR: { - FORState for_state = std::get<2>(state); - compressed_size += Compression::FOREncoding(for_state.values, for_state.min_value, for_state.bit_width, dest); + FORState for_state = std::get<1>(state); + compressed_size += Compression::FOREncoding( + for_state.values.data(), + for_state.values.size(), + for_state.min_value, + for_state.bit_width, + dest); break; } - case IntegerMode::DELTA_FOR: + case IntegerMode::DeltaFOR: { - DeltaFORState delta_for_state = std::get<3>(state); + DeltaFORState delta_for_state = std::get<2>(state); unalignedStore(dest, values[0]); dest += sizeof(T); compressed_size += sizeof(T); - compressed_size += Compression::FOREncoding, true>( - delta_for_state.deltas, + compressed_size += Compression::FOREncoding( + delta_for_state.deltas.data(), + delta_for_state.deltas.size(), delta_for_state.min_delta_value, delta_for_state.bit_width, dest); @@ -274,7 +302,7 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source, return compressed_size; } -template +template void CompressionCodecLightweight::decompressDataForInteger( const char * source, UInt32 source_size, @@ -293,10 +321,10 @@ void CompressionCodecLightweight::decompressDataForInteger( source_size -= sizeof(UInt8); switch (mode) { - case IntegerMode::CONSTANT: + case IntegerMode::Constant: Compression::constantDecoding(source, source_size, dest, output_size); break; - case IntegerMode::CONSTANT_DELTA: + case IntegerMode::ConstantDelta: Compression::constantDeltaDecoding(source, source_size, dest, output_size); break; case IntegerMode::RunLength: @@ -305,7 +333,7 @@ void CompressionCodecLightweight::decompressDataForInteger( case IntegerMode::FOR: Compression::FORDecoding(source, source_size, dest, output_size); break; - case IntegerMode::DELTA_FOR: + case IntegerMode::DeltaFOR: Compression::deltaFORDecoding(source, source_size, dest, output_size); break; case IntegerMode::LZ4: diff --git a/dbms/src/IO/Compression/CompressionCodecRunLength.cpp b/dbms/src/IO/Compression/CompressionCodecRunLength.cpp index fda7d36d96b..72f03420df5 100644 --- a/dbms/src/IO/Compression/CompressionCodecRunLength.cpp +++ b/dbms/src/IO/Compression/CompressionCodecRunLength.cpp @@ -53,23 +53,11 @@ UInt32 compressDataForInteger(const char * source, UInt32 source_size, char * de constexpr auto bytes_size = sizeof(T); if unlikely (source_size % bytes_size != 0) throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); - const char * source_end = source + source_size; - DB::Compression::RunLengthPairs rle_vec; - rle_vec.reserve(source_size / bytes_size); - for (const auto * src = source; src < source_end; src += bytes_size) - { - T value = unalignedLoad(src); - // If the value is different from the previous one or the counter is at the maximum value (255 + 1 = 0), - // we need to start a new run. - // Otherwise, we can just increment the counter. - if (rle_vec.empty() || rle_vec.back().first != value - || rle_vec.back().second == std::numeric_limits::max()) - rle_vec.emplace_back(value, 1); - else - ++rle_vec.back().second; - } - return DB::Compression::runLengthEncoding(rle_vec, dest); + const auto * typed_source = reinterpret_cast(source); + const auto source_count = source_size / bytes_size; + + return DB::Compression::runLengthEncoding(typed_source, source_count, dest); } } // namespace diff --git a/dbms/src/IO/Compression/CompressionCodecZSTD.h b/dbms/src/IO/Compression/CompressionCodecZSTD.h index 5c61e75dff0..f3f8d2f27bb 100644 --- a/dbms/src/IO/Compression/CompressionCodecZSTD.h +++ b/dbms/src/IO/Compression/CompressionCodecZSTD.h @@ -19,6 +19,9 @@ namespace DB { +class CompressionCodecFactory; + + class CompressionCodecZSTD : public ICompressionCodec { public: @@ -38,6 +41,7 @@ class CompressionCodecZSTD : public ICompressionCodec private: const int level; + friend class CompressionCodecFactory; }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionSettings.cpp b/dbms/src/IO/Compression/CompressionSettings.cpp index 2e890a4f249..39755660585 100644 --- a/dbms/src/IO/Compression/CompressionSettings.cpp +++ b/dbms/src/IO/Compression/CompressionSettings.cpp @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "CompressionSettings.h" - #include +#include #include #include +#include + + namespace DB { CompressionSetting::CompressionSetting(const Settings & settings) @@ -48,9 +50,37 @@ int CompressionSetting::getDefaultLevel(CompressionMethod method) case CompressionMethod::QPL: return 1; #endif + case CompressionMethod::Lightweight: + return 3; default: return -1; } } +template +CompressionSetting CompressionSetting::create(T method, int level, const IDataType & type) +{ + // Nullable type will be treated as Unknown + CompressionSetting setting(method); + if (type.isValueRepresentedByInteger()) + { + auto data_type = magic_enum::enum_cast(type.getSizeOfValueInMemory()); + setting.data_type = data_type ? *data_type : CompressionDataType::Unknown; + } + else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 4) + setting.data_type = CompressionDataType::Float32; + else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 8) + setting.data_type = CompressionDataType::Float64; + // TODO: support String + // else if (type.isStringOrFixedString()) + // setting.data_type = CompressionDataType::String; + else + setting.data_type = CompressionDataType::Unknown; + setting.level = level; + return setting; +} + +template CompressionSetting CompressionSetting::create(CompressionMethod method, int level, const IDataType & type); +template CompressionSetting CompressionSetting::create(CompressionMethodByte method, int level, const IDataType & type); + } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionSettings.h b/dbms/src/IO/Compression/CompressionSettings.h index 87ce30b60c5..a70eedff34b 100644 --- a/dbms/src/IO/Compression/CompressionSettings.h +++ b/dbms/src/IO/Compression/CompressionSettings.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -48,9 +49,12 @@ const std::unordered_map method_map = struct CompressionSetting { CompressionMethod method; - CompressionMethodByte method_byte; int level; + // The type of data to be compressed. + // It may be used to determine the codec to use. CompressionDataType data_type = CompressionDataType::Unknown; + // Always use method_byte to determine the codec to use except for LZ4HC codec + CompressionMethodByte method_byte; CompressionSetting() : CompressionSetting(CompressionMethod::LZ4) @@ -58,24 +62,27 @@ struct CompressionSetting explicit CompressionSetting(CompressionMethod method_) : method(method_) - , method_byte(method_byte_map[static_cast(method_)]) , level(getDefaultLevel(method)) + , method_byte(method_byte_map[static_cast(method_)]) {} explicit CompressionSetting(CompressionMethodByte method_byte_) : method(method_map.at(method_byte_)) - , method_byte(method_byte_) , level(getDefaultLevel(method)) + , method_byte(method_byte_) {} CompressionSetting(CompressionMethod method_, int level_) : method(method_) - , method_byte(method_byte_map[static_cast(method_)]) , level(level_) + , method_byte(method_byte_map[static_cast(method_)]) {} explicit CompressionSetting(const Settings & settings); + template + static CompressionSetting create(T method, int level, const IDataType & type); + static int getDefaultLevel(CompressionMethod method); }; diff --git a/dbms/src/IO/Compression/EncodingUtil.cpp b/dbms/src/IO/Compression/EncodingUtil.cpp index 29717c7d32b..6d98df1418f 100644 --- a/dbms/src/IO/Compression/EncodingUtil.cpp +++ b/dbms/src/IO/Compression/EncodingUtil.cpp @@ -21,6 +21,145 @@ namespace DB::Compression { +template +char * writeSameValueMultipleTime(T value, UInt32 count, char * dest) +{ + if (unlikely(count == 0)) + return dest; + if constexpr (sizeof(T) == 1) + { + memset(dest, value, count); + dest += count; + return dest; + } + UInt32 j = 0; +#if defined(__AVX2__) + // avx2 + while (j + sizeof(__m256i) / sizeof(T) <= count) + { + if constexpr (sizeof(T) == 2) + { + auto value_avx2 = _mm256_set1_epi16(value); + _mm256_storeu_si256(reinterpret_cast<__m256i *>(dest), value_avx2); + } + else if constexpr (sizeof(T) == 4) + { + auto value_avx2 = _mm256_set1_epi32(value); + _mm256_storeu_si256(reinterpret_cast<__m256i *>(dest), value_avx2); + } + else if constexpr (sizeof(T) == 8) + { + auto value_avx2 = _mm256_set1_epi64x(value); + _mm256_storeu_si256(reinterpret_cast<__m256i *>(dest), value_avx2); + } + j += sizeof(__m256i) / sizeof(T); + dest += sizeof(__m256i); + } +#endif +#if defined(__SSE2__) + // sse + while (j + sizeof(__m128i) / sizeof(T) <= count) + { + if constexpr (sizeof(T) == 2) + { + auto value_sse = _mm_set1_epi16(value); + _mm_storeu_si128(reinterpret_cast<__m128i *>(dest), value_sse); + } + else if constexpr (sizeof(T) == 4) + { + auto value_sse = _mm_set1_epi32(value); + _mm_storeu_si128(reinterpret_cast<__m128i *>(dest), value_sse); + } + else if constexpr (sizeof(T) == 8) + { + auto value_sse = _mm_set1_epi64x(value); + _mm_storeu_si128(reinterpret_cast<__m128i *>(dest), value_sse); + } + j += sizeof(__m128i) / sizeof(T); + dest += sizeof(__m128i); + } +#endif + // scalar + for (; j < count; ++j) + { + unalignedStore(dest, value); + dest += sizeof(T); + } + return dest; +} + +template char * writeSameValueMultipleTime(UInt8, UInt32, char *); +template char * writeSameValueMultipleTime(UInt16, UInt32, char *); +template char * writeSameValueMultipleTime(UInt32, UInt32, char *); +template char * writeSameValueMultipleTime(UInt64, UInt32, char *); + +/// Constant encoding + +template +void constantDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + if (unlikely(source_size < sizeof(T))) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot use Constant decoding, data size {} is too small", + source_size); + + T constant = unalignedLoad(src); + writeSameValueMultipleTime(constant, dest_size / sizeof(T), dest); +} + +template void constantDecoding(const char *, UInt32, char *, UInt32); +template void constantDecoding(const char *, UInt32, char *, UInt32); +template void constantDecoding(const char *, UInt32, char *, UInt32); +template void constantDecoding(const char *, UInt32, char *, UInt32); + +/// ConstantDelta encoding + +template +void constantDeltaDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + if (unlikely(source_size < sizeof(T) + sizeof(T))) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot use ConstantDelta decoding, data size {} is too small", + source_size); + + T first_value = unalignedLoad(src); + T constant_delta = unalignedLoad(src + sizeof(T)); + for (size_t i = 0; i < dest_size / sizeof(T); ++i) + { + unalignedStore(dest, first_value); + first_value += constant_delta; + dest += sizeof(T); + } +} + +template void constantDeltaDecoding(const char *, UInt32, char *, UInt32); +template void constantDeltaDecoding(const char *, UInt32, char *, UInt32); +template void constantDeltaDecoding(const char *, UInt32, char *, UInt32); +template void constantDeltaDecoding(const char *, UInt32, char *, UInt32); + +/// Delta encoding + +template +void deltaEncoding(const T * source, UInt32 count, T * dest) +{ + T prev = 0; + for (UInt32 i = 0; i < count; ++i) + { + T curr = source[i]; + dest[i] = curr - prev; + prev = curr; + } +} + +template void deltaEncoding(const Int8 *, UInt32, Int8 *); +template void deltaEncoding(const Int16 *, UInt32, Int16 *); +template void deltaEncoding(const Int32 *, UInt32, Int32 *); +template void deltaEncoding(const Int64 *, UInt32, Int64 *); + +/// Delta + FrameOfReference encoding + template void applyFrameOfReference(T * dst, T frame_of_reference, UInt32 count) { @@ -76,37 +215,41 @@ void subtractFrameOfReference(T * dst, T frame_of_reference, UInt32 count) if (frame_of_reference == 0) return; + using TU = std::make_unsigned_t; + auto * unsigned_dst = reinterpret_cast(dst); + auto unsigned_frame_of_reference = static_cast(frame_of_reference); + UInt32 i = 0; #if defined(__AVX2__) UInt32 aligned_count = count - count % (sizeof(__m256i) / sizeof(T)); for (; i < aligned_count; i += (sizeof(__m256i) / sizeof(T))) { // Load the data using SIMD - __m256i value = _mm256_loadu_si256(reinterpret_cast<__m256i *>(dst + i)); + __m256i value = _mm256_loadu_si256(reinterpret_cast<__m256i *>(unsigned_dst + i)); // Perform vectorized addition if constexpr (sizeof(T) == 1) { - value = _mm256_sub_epi8(value, _mm256_set1_epi8(frame_of_reference)); + value = _mm256_sub_epi8(value, _mm256_set1_epi8(unsigned_frame_of_reference)); } else if constexpr (sizeof(T) == 2) { - value = _mm256_sub_epi16(value, _mm256_set1_epi16(frame_of_reference)); + value = _mm256_sub_epi16(value, _mm256_set1_epi16(unsigned_frame_of_reference)); } else if constexpr (sizeof(T) == 4) { - value = _mm256_sub_epi32(value, _mm256_set1_epi32(frame_of_reference)); + value = _mm256_sub_epi32(value, _mm256_set1_epi32(unsigned_frame_of_reference)); } else if constexpr (sizeof(T) == 8) { - value = _mm256_sub_epi64(value, _mm256_set1_epi64x(frame_of_reference)); + value = _mm256_sub_epi64(value, _mm256_set1_epi64x(unsigned_frame_of_reference)); } // Store the result back to memory - _mm256_storeu_si256(reinterpret_cast<__m256i *>(dst + i), value); + _mm256_storeu_si256(reinterpret_cast<__m256i *>(unsigned_dst + i), value); } #endif for (; i < count; ++i) { - dst[i] -= frame_of_reference; + unsigned_dst[i] -= unsigned_frame_of_reference; } } @@ -119,43 +262,15 @@ template void subtractFrameOfReference(UInt16 *, UInt16, UInt32); template void subtractFrameOfReference(UInt32 *, UInt32, UInt32); template void subtractFrameOfReference(UInt64 *, UInt64, UInt32); -template -UInt8 FOREncodingWidth(std::vector & values, T frame_of_reference) -{ - assert(!values.empty()); // caller must ensure input is not empty - - if constexpr (std::is_signed_v) - { - // For signed types, after subtracting frame of reference, the range of values is not always [0, max_value - min_value]. - // For example, we have a sequence of Int8 values [-128, 1, 127], after subtracting frame of reference -128, the values are [0, -127, -1]. - // The minimum bit width required to store the values is 8 rather than the width of `max_value - min_value = -1`. - // So we need to calculate the minimum bit width of the values after subtracting frame of reference. - subtractFrameOfReference(values.data(), frame_of_reference, values.size()); - auto [min_value, max_value] = std::minmax_element(values.cbegin(), values.cend()); - return BitpackingPrimitives::minimumBitWidth(*min_value, *max_value); - } - else - { - T max_value = *std::max_element(values.cbegin(), values.cend()); - return BitpackingPrimitives::minimumBitWidth(max_value - frame_of_reference); - } -} - -template UInt8 FOREncodingWidth(std::vector &, Int8); -template UInt8 FOREncodingWidth(std::vector &, Int16); -template UInt8 FOREncodingWidth(std::vector &, Int32); -template UInt8 FOREncodingWidth(std::vector &, Int64); -template UInt8 FOREncodingWidth(std::vector &, UInt8); -template UInt8 FOREncodingWidth(std::vector &, UInt16); -template UInt8 FOREncodingWidth(std::vector &, UInt32); -template UInt8 FOREncodingWidth(std::vector &, UInt64); - template void deltaDecoding(const char * source, UInt32 source_size, char * dest) { ordinaryDeltaDecoding(source, source_size, dest); } +template void deltaDecoding(const char *, UInt32, char *); +template void deltaDecoding(const char *, UInt32, char *); + #if defined(__AVX2__) /** @@ -164,11 +279,11 @@ void deltaDecoding(const char * source, UInt32 source_size, char * dest) */ template <> -void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest) +void deltaDecoding(const char * raw_source, UInt32 raw_source_size, char * raw_dest) { - const auto * source = reinterpret_cast(raw_source); - auto source_size = raw_source_size / sizeof(UInt32); - auto * dest = reinterpret_cast(raw_dest); + const auto * source = reinterpret_cast(raw_source); + auto source_size = raw_source_size / sizeof(Int32); + auto * dest = reinterpret_cast(raw_dest); __m128i prev = _mm_setzero_si128(); size_t i = 0; for (; i < source_size / 4; i++) @@ -179,7 +294,7 @@ void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_sour prev = _mm_add_epi32(tmp2, _mm_shuffle_epi32(prev, 0xff)); _mm_storeu_si128(reinterpret_cast<__m128i *>(dest) + i, prev); } - uint32_t lastprev = _mm_extract_epi32(prev, 3); + Int32 lastprev = _mm_extract_epi32(prev, 3); for (i = 4 * i; i < source_size; ++i) { lastprev = lastprev + source[i]; @@ -188,11 +303,11 @@ void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_sour } template <> -void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest) +void deltaDecoding(const char * raw_source, UInt32 raw_source_size, char * raw_dest) { - const auto * source = reinterpret_cast(raw_source); - auto source_size = raw_source_size / sizeof(UInt64); - auto * dest = reinterpret_cast(raw_dest); + const auto * source = reinterpret_cast(raw_source); + auto source_size = raw_source_size / sizeof(Int64); + auto * dest = reinterpret_cast(raw_dest); // AVX2 does not support shffule across 128-bit lanes, so we need to use permute. __m256i prev = _mm256_setzero_si256(); __m256i zero = _mm256_setzero_si256(); @@ -213,7 +328,7 @@ void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_sour // prev = {prev[3], prev[3], prev[3], prev[3]} prev = _mm256_permute4x64_epi64(prev, 0b11111111); } - UInt64 lastprev = _mm256_extract_epi64(prev, 3); + Int64 lastprev = _mm256_extract_epi64(prev, 3); for (i = 4 * i; i < source_size; ++i) { lastprev += source[i]; @@ -221,6 +336,11 @@ void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_sour } } +#else + +template void deltaDecoding(const char *, UInt32, char *); +template void deltaDecoding(const char *, UInt32, char *); + #endif template @@ -254,12 +374,14 @@ void deltaFORDecoding(const char * src, UInt32 source_size, char * dest, memset(tmp_buffer, 0, required_size); // copy the first value to the temporary buffer memcpy(tmp_buffer, src, TYPE_BYTE_SIZE); - FORDecoding( + FORDecoding( src + TYPE_BYTE_SIZE, source_size - TYPE_BYTE_SIZE, tmp_buffer + TYPE_BYTE_SIZE, required_size - TYPE_BYTE_SIZE); - deltaDecoding(reinterpret_cast(tmp_buffer), dest_size, dest); + auto * deltas = reinterpret_cast(tmp_buffer + TYPE_BYTE_SIZE); + zigZagDecoding(deltas, deltas_count, deltas); + deltaDecoding(tmp_buffer, dest_size, dest); } template <> @@ -282,13 +404,143 @@ void deltaFORDecoding(const char * src, UInt32 source_size, char * dest, memset(tmp_buffer, 0, required_size); // copy the first value to the temporary buffer memcpy(tmp_buffer, src, TYPE_BYTE_SIZE); - FORDecoding( + FORDecoding( src + TYPE_BYTE_SIZE, source_size - TYPE_BYTE_SIZE, tmp_buffer + TYPE_BYTE_SIZE, required_size - TYPE_BYTE_SIZE); - deltaDecoding(reinterpret_cast(tmp_buffer), dest_size, dest); + auto * deltas = reinterpret_cast(tmp_buffer + TYPE_BYTE_SIZE); + zigZagDecoding(deltas, deltas_count, deltas); + deltaDecoding(tmp_buffer, dest_size, dest); +} + +/// Run-length encoding + +template +size_t runLengthEncodedApproximateSize(const T * source, UInt32 source_size) +{ + T prev_value = source[0]; + size_t pair_count = 1; + + for (UInt32 i = 1; i < source_size; ++i) + { + T value = source[i]; + if (prev_value != value) + { + ++pair_count; + prev_value = value; + } + } + return pair_count * RunLengthPairLength; +} + +template size_t runLengthEncodedApproximateSize(const UInt8 *, UInt32); +template size_t runLengthEncodedApproximateSize(const UInt16 *, UInt32); +template size_t runLengthEncodedApproximateSize(const UInt32 *, UInt32); +template size_t runLengthEncodedApproximateSize(const UInt64 *, UInt32); + +template +size_t runLengthEncoding(const T * source, UInt32 source_size, char * dest) +{ + T prev_value = source[0]; + memcpy(dest, source, sizeof(T)); + dest += sizeof(T); + + std::vector counts; + counts.reserve(source_size); + UInt8 count = 1; + + for (UInt32 i = 1; i < source_size; ++i) + { + T value = source[i]; + if (prev_value == value && count < std::numeric_limits::max()) + { + ++count; + } + else + { + counts.push_back(count); + unalignedStore(dest, value); + dest += sizeof(T); + prev_value = value; + count = 1; + } + } + counts.push_back(count); + + memcpy(dest, counts.data(), counts.size()); + return counts.size() * RunLengthPairLength; +} + +template size_t runLengthEncoding(const UInt8 *, UInt32, char *); +template size_t runLengthEncoding(const UInt16 *, UInt32, char *); +template size_t runLengthEncoding(const UInt32 *, UInt32, char *); +template size_t runLengthEncoding(const UInt64 *, UInt32, char *); + +template +void runLengthDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + if (unlikely(source_size % RunLengthPairLength != 0)) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot use RunLength decoding, data size {} is not aligned to {}", + source_size, + RunLengthPairLength); + + const auto pair_count = source_size / RunLengthPairLength; + const char * count_src = src + pair_count * sizeof(T); + + const char * dest_end = dest + dest_size; + for (UInt32 i = 0; i < pair_count; ++i) + { + T value = unalignedLoad(src); + src += sizeof(T); + auto count = unalignedLoad(count_src); + count_src += sizeof(UInt8); + if (unlikely(dest + count * sizeof(T) > dest_end)) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot use RunLength decoding, data is too large, value={}, count={} elem_byte={}", + value, + count, + sizeof(T)); + dest = writeSameValueMultipleTime(value, count, dest); + } +} + +template void runLengthDecoding(const char *, UInt32, char *, UInt32); +template void runLengthDecoding(const char *, UInt32, char *, UInt32); +template void runLengthDecoding(const char *, UInt32, char *, UInt32); +template void runLengthDecoding(const char *, UInt32, char *, UInt32); + +/// ZigZag encoding + +template +void zigZagEncoding(const T * source, UInt32 count, T * dest) +{ + using TS = typename std::make_signed::type; + for (UInt32 i = 0; i < count; ++i) + { + TS value = source[i]; + dest[i] = (value << 1) ^ (value >> (sizeof(T) * 8 - 1)); + } +} + +template void zigZagEncoding(const UInt8 *, UInt32, UInt8 *); +template void zigZagEncoding(const UInt16 *, UInt32, UInt16 *); +template void zigZagEncoding(const UInt32 *, UInt32, UInt32 *); +template void zigZagEncoding(const UInt64 *, UInt32, UInt64 *); + +template +void zigZagDecoding(const T * source, UInt32 count, T * dest) +{ + for (UInt32 i = 0; i < count; ++i) + dest[i] = (source[i] >> 1) ^ (-(source[i] & 1)); } +template void zigZagDecoding(const UInt8 *, UInt32, UInt8 *); +template void zigZagDecoding(const UInt16 *, UInt32, UInt16 *); +template void zigZagDecoding(const UInt32 *, UInt32, UInt32 *); +template void zigZagDecoding(const UInt64 *, UInt32, UInt64 *); } // namespace DB::Compression diff --git a/dbms/src/IO/Compression/EncodingUtil.h b/dbms/src/IO/Compression/EncodingUtil.h index 24cad02ab5c..7f544d0ea7f 100644 --- a/dbms/src/IO/Compression/EncodingUtil.h +++ b/dbms/src/IO/Compression/EncodingUtil.h @@ -19,9 +19,9 @@ #include #include -#if defined(__AVX2__) -#include -#endif +#include +#include + namespace DB::ErrorCodes { @@ -32,36 +32,25 @@ extern const int CANNOT_DECOMPRESS; namespace DB::Compression { +template +char * writeSameValueMultipleTime(T value, UInt32 count, char * dest); + /// Constant encoding template -size_t constantEncoding(T constant, char * dest) +inline size_t constantEncoding(T constant, char * dest) { unalignedStore(dest, constant); return sizeof(T); } template -void constantDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) -{ - if (unlikely(source_size < sizeof(T))) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot use Constant decoding, data size {} is too small", - source_size); - - T constant = unalignedLoad(src); - for (size_t i = 0; i < dest_size / sizeof(T); ++i) - { - unalignedStore(dest, constant); - dest += sizeof(T); - } -} +void constantDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size); /// Constant delta encoding template -size_t constantDeltaEncoding(T first_value, T constant_delta, char * dest) +inline size_t constantDeltaEncoding(T first_value, T constant_delta, char * dest) { unalignedStore(dest, first_value); dest += sizeof(T); @@ -70,23 +59,7 @@ size_t constantDeltaEncoding(T first_value, T constant_delta, char * dest) } template -void constantDeltaDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) -{ - if (unlikely(source_size < sizeof(T) + sizeof(T))) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot use ConstantDelta decoding, data size {} is too small", - source_size); - - T first_value = unalignedLoad(src); - T constant_delta = unalignedLoad(src + sizeof(T)); - for (size_t i = 0; i < dest_size / sizeof(T); ++i) - { - unalignedStore(dest, first_value); - first_value += constant_delta; - dest += sizeof(T); - } -} +void constantDeltaDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size); /// Run-length encoding @@ -98,63 +71,17 @@ using RunLengthPairs = std::vector>; template static constexpr size_t RunLengthPairLength = sizeof(T) + sizeof(UInt8); +// Return the approximate size of the run-length encoded data. The actual size may be larger. template -size_t runLengthPairsByteSize(const RunLengthPairs & rle) -{ - return rle.size() * RunLengthPairLength; -} +size_t runLengthEncodedApproximateSize(const T * source, UInt32 source_size); +// After run-length encoding, the values stored in `dest` are: +// [val1, val2, val3, ..., valn, cnt1, cnt2, ..., cntn] template -size_t runLengthEncoding(const RunLengthPairs & rle, char * dest) -{ - for (const auto & [value, count] : rle) - { - unalignedStore(dest, value); - dest += sizeof(T); - unalignedStore(dest, count); - dest += sizeof(UInt8); - } - return rle.size() * RunLengthPairLength; -} +size_t runLengthEncoding(const T * source, UInt32 source_size, char * dest); template -void runLengthDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) -{ - if (unlikely(source_size % RunLengthPairLength != 0)) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot use RunLength decoding, data size {} is not aligned to {}", - source_size, - RunLengthPairLength); - - const char * dest_end = dest + dest_size; - for (UInt32 i = 0; i < source_size / RunLengthPairLength; ++i) - { - T value = unalignedLoad(src); - src += sizeof(T); - auto count = unalignedLoad(src); - src += sizeof(UInt8); - if (unlikely(dest + count * sizeof(T) > dest_end)) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot use RunLength decoding, data is too large, count={} elem_byte={}", - count, - sizeof(T)); - if constexpr (std::is_same_v || std::is_same_v) - { - memset(dest, value, count); - dest += count * sizeof(T); - } - else - { - for (UInt32 j = 0; j < count; ++j) - { - unalignedStore(dest, value); - dest += sizeof(T); - } - } - } -} +void runLengthDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size); /// Frame of Reference encoding @@ -162,15 +89,11 @@ template void subtractFrameOfReference(T * dst, T frame_of_reference, UInt32 count); template -UInt8 FOREncodingWidth(std::vector & values, T frame_of_reference); - -template -size_t FOREncoding(std::vector & values, T frame_of_reference, UInt8 width, char * dest) +size_t FOREncoding(T * values, UInt32 count, T frame_of_reference, UInt8 width, char * dest) { - assert(!values.empty()); // caller must ensure input is not empty + assert(count != 0); // caller must ensure input is not empty - if constexpr (!skip_subtract_frame_of_reference) - subtractFrameOfReference(values.data(), frame_of_reference, values.size()); + subtractFrameOfReference(values, frame_of_reference, count); // store frame of reference unalignedStore(dest, frame_of_reference); dest += sizeof(T); @@ -180,9 +103,9 @@ size_t FOREncoding(std::vector & values, T frame_of_reference, UInt8 width, c // if width == 0, skip bitpacking if (width == 0) return sizeof(T) + sizeof(UInt8); - auto required_size = BitpackingPrimitives::getRequiredSize(values.size(), width); + auto required_size = BitpackingPrimitives::getRequiredSize(count, width); // after applying frame of reference, all values are bigger than 0. - BitpackingPrimitives::packBuffer(reinterpret_cast(dest), values.data(), values.size(), width); + BitpackingPrimitives::packBuffer(reinterpret_cast(dest), values, count, width); return sizeof(T) + sizeof(UInt8) + required_size; } @@ -225,19 +148,18 @@ void FORDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_ applyFrameOfReference(reinterpret_cast(dest), frame_of_reference, count); } +/// ZigZag encoding + +template +void zigZagEncoding(const T * source, UInt32 count, T * dest); + +template +void zigZagDecoding(const T * source, UInt32 count, T * dest); + /// Delta encoding template -void deltaEncoding(const T * source, UInt32 count, T * dest) -{ - T prev = 0; - for (UInt32 i = 0; i < count; ++i) - { - T curr = source[i]; - dest[i] = curr - prev; - prev = curr; - } -} +void deltaEncoding(const T * source, UInt32 count, T * dest); template void ordinaryDeltaDecoding(const char * source, UInt32 source_size, char * dest) @@ -266,14 +188,18 @@ void ordinaryDeltaFORDecoding(const char * src, UInt32 source_size, char * dest, assert(source_size >= sizeof(T)); assert(dest_size >= sizeof(T)); - using TS = typename std::make_signed_t; // copy first value to dest memcpy(dest, src, sizeof(T)); if (unlikely(source_size == sizeof(T))) return; - // decode deltas - FORDecoding(src + sizeof(T), source_size - sizeof(T), dest + sizeof(T), dest_size - sizeof(T)); - ordinaryDeltaDecoding(dest, dest_size, dest); + // FOR decode deltas + FORDecoding(src + sizeof(T), source_size - sizeof(T), dest + sizeof(T), dest_size - sizeof(T)); + // ZigZag decode deltas + auto * deltas = reinterpret_cast(dest + sizeof(T)); + zigZagDecoding(deltas, (dest_size - sizeof(T)) / sizeof(T), deltas); + // delta decode + using TS = typename std::make_signed::type; + ordinaryDeltaDecoding(dest, dest_size, dest); } template diff --git a/dbms/src/IO/Compression/tests/bench_codec.cpp b/dbms/src/IO/Compression/tests/bench_codec.cpp index 272aba82b34..7ef61e2021d 100644 --- a/dbms/src/IO/Compression/tests/bench_codec.cpp +++ b/dbms/src/IO/Compression/tests/bench_codec.cpp @@ -272,4 +272,45 @@ static void multipleRead(benchmark::State & state) BENCH_MULTIPLE_WRITE(CodecMultipleWrite) BENCH_MULTIPLE_READ(CodecMultipleRead) +BENCH_SINGLE_READ_METHOD_GENERATOR_TYPE( + RunLengthReadUInt8, + CompressionMethodByte::RunLength, + tests::RepeatGenerator(0), + UInt8) +BENCH_SINGLE_READ_METHOD_GENERATOR_TYPE( + RunLengthReadUInt16, + CompressionMethodByte::RunLength, + tests::RepeatGenerator(0), + UInt16) +BENCH_SINGLE_READ_METHOD_GENERATOR_TYPE( + RunLengthReadUInt32, + CompressionMethodByte::RunLength, + tests::RepeatGenerator(0), + UInt32) +BENCH_SINGLE_READ_METHOD_GENERATOR_TYPE( + RunLengthReadUInt64, + CompressionMethodByte::RunLength, + tests::RepeatGenerator(0), + UInt64) +BENCH_SINGLE_WRITE_METHOD_GENERATOR_TYPE( + RunLengthWriteUInt8, + CompressionMethodByte::RunLength, + tests::RepeatGenerator(0), + UInt8) +BENCH_SINGLE_WRITE_METHOD_GENERATOR_TYPE( + RunLengthWriteUInt16, + CompressionMethodByte::RunLength, + tests::RepeatGenerator(0), + UInt16) +BENCH_SINGLE_WRITE_METHOD_GENERATOR_TYPE( + RunLengthWriteUInt32, + CompressionMethodByte::RunLength, + tests::RepeatGenerator(0), + UInt32) +BENCH_SINGLE_WRITE_METHOD_GENERATOR_TYPE( + RunLengthWriteUInt64, + CompressionMethodByte::RunLength, + tests::RepeatGenerator(0), + UInt64) + } // namespace DB::bench diff --git a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp index 1c3a05df915..f359c2f9e49 100644 --- a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp +++ b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp @@ -390,7 +390,6 @@ INSTANTIATE_TEST_CASE_P( generateSeq(G(RandomGenerator(0, 0, 1000'000'000))), generateSeq(G(RandomGenerator(0, 0, 1000'000'000)))))); - INSTANTIATE_TEST_CASE_P( RepeatInt, CodecTest, diff --git a/dbms/src/Interpreters/SettingsCommon.h b/dbms/src/Interpreters/SettingsCommon.h index 9c2d1700296..cbefcd610fc 100644 --- a/dbms/src/Interpreters/SettingsCommon.h +++ b/dbms/src/Interpreters/SettingsCommon.h @@ -56,12 +56,12 @@ struct SettingInt public: bool changed = false; - SettingInt(IntType x = 0) + SettingInt(IntType x = 0) // NOLINT(google-explicit-constructor) : value(x) {} SettingInt(const SettingInt & setting); - operator IntType() const { return value.load(); } + operator IntType() const { return value.load(); } // NOLINT(google-explicit-constructor) SettingInt & operator=(IntType x) { set(x); @@ -113,12 +113,12 @@ struct SettingMaxThreads bool is_auto; bool changed = false; - SettingMaxThreads(UInt64 x = 0) + SettingMaxThreads(UInt64 x = 0) // NOLINT(google-explicit-constructor) : is_auto(x == 0) , value(x ? x : getAutoValue()) {} - operator UInt64() const { return value; } + operator UInt64() const { return value; } // NOLINT(google-explicit-constructor) SettingMaxThreads & operator=(UInt64 x) { set(x); @@ -187,11 +187,11 @@ struct SettingSeconds public: bool changed = false; - SettingSeconds(UInt64 seconds = 0) + SettingSeconds(UInt64 seconds = 0) // NOLINT(google-explicit-constructor) : value(seconds, 0) {} - operator Poco::Timespan() const { return value; } + operator Poco::Timespan() const { return value; } // NOLINT(google-explicit-constructor) SettingSeconds & operator=(const Poco::Timespan & x) { set(x); @@ -235,11 +235,11 @@ struct SettingMilliseconds public: bool changed = false; - SettingMilliseconds(UInt64 milliseconds = 0) + SettingMilliseconds(UInt64 milliseconds = 0) // NOLINT(google-explicit-constructor) : value(milliseconds * 1000) {} - operator Poco::Timespan() const { return value; } + operator Poco::Timespan() const { return value; } // NOLINT(google-explicit-constructor) SettingMilliseconds & operator=(const Poco::Timespan & x) { set(x); @@ -283,11 +283,11 @@ struct SettingFloat public: bool changed = false; - SettingFloat(float x = 0) + SettingFloat(float x = 0) // NOLINT(google-explicit-constructor) : value(x) {} SettingFloat(const SettingFloat & setting) { value.store(setting.value.load()); } - operator float() const { return value.load(); } + operator float() const { return value.load(); } // NOLINT(google-explicit-constructor) SettingFloat & operator=(float x) { set(x); @@ -390,11 +390,11 @@ struct SettingDouble public: bool changed = false; - SettingDouble(double x = 0) + SettingDouble(double x = 0) // NOLINT(google-explicit-constructor) : value(x) {} SettingDouble(const SettingDouble & setting) { value.store(setting.value.load()); } - operator double() const { return value.load(); } + operator double() const { return value.load(); } // NOLINT(google-explicit-constructor) SettingDouble & operator=(double x) { set(x); @@ -473,11 +473,11 @@ struct SettingLoadBalancing public: bool changed = false; - SettingLoadBalancing(LoadBalancing x) + explicit SettingLoadBalancing(LoadBalancing x) : value(x) {} - operator LoadBalancing() const { return value; } + operator LoadBalancing() const { return value; } // NOLINT(google-explicit-constructor) SettingLoadBalancing & operator=(LoadBalancing x) { set(x); @@ -537,11 +537,11 @@ struct SettingOverflowMode public: bool changed = false; - SettingOverflowMode(OverflowMode x = OverflowMode::THROW) + explicit SettingOverflowMode(OverflowMode x = OverflowMode::THROW) : value(x) {} - operator OverflowMode() const { return value; } + operator OverflowMode() const { return value; } // NOLINT(google-explicit-constructor) SettingOverflowMode & operator=(OverflowMode x) { set(x); @@ -602,7 +602,7 @@ struct SettingChecksumAlgorithm public: bool changed = false; - SettingChecksumAlgorithm(ChecksumAlgo x = ChecksumAlgo::XXH3) // NOLINT(google-explicit-constructor) + explicit SettingChecksumAlgorithm(ChecksumAlgo x = ChecksumAlgo::XXH3) : value(x) {} @@ -678,11 +678,11 @@ struct SettingCompressionMethod public: bool changed = false; - SettingCompressionMethod(CompressionMethod x = CompressionMethod::LZ4) + explicit SettingCompressionMethod(CompressionMethod x = CompressionMethod::LZ4) : value(x) {} - operator CompressionMethod() const { return value; } + operator CompressionMethod() const { return value; } // NOLINT(google-explicit-constructor) SettingCompressionMethod & operator=(CompressionMethod x) { set(x); @@ -701,28 +701,35 @@ struct SettingCompressionMethod #if USE_QPL if (lower_str == "qpl") return CompressionMethod::QPL; +#endif + if (lower_str == "none") + return CompressionMethod::NONE; + if (lower_str == "lightweight") + return CompressionMethod::Lightweight; +#if USE_QPL throw Exception( - "Unknown compression method: '" + s + "', must be one of 'lz4', 'lz4hc', 'zstd', 'qpl'", - ErrorCodes::UNKNOWN_COMPRESSION_METHOD); + ErrorCodes::UNKNOWN_COMPRESSION_METHOD, + "Unknown compression method: '{}', must be one of 'lz4', 'lz4hc', 'zstd', 'qpl', 'none', 'lightweight'", + s); #else throw Exception( - "Unknown compression method: '" + s + "', must be one of 'lz4', 'lz4hc', 'zstd'", - ErrorCodes::UNKNOWN_COMPRESSION_METHOD); + ErrorCodes::UNKNOWN_COMPRESSION_METHOD, + "Unknown compression method: '{}', must be one of 'lz4', 'lz4hc', 'zstd', 'none', 'lightweight'", + s); #endif } String toString() const { -#if USE_QPL - const char * strings[] = {nullptr, "lz4", "lz4hc", "zstd", "qpl"}; - auto compression_method_last = CompressionMethod::QPL; -#else - const char * strings[] = {nullptr, "lz4", "lz4hc", "zstd"}; - auto compression_method_last = CompressionMethod::ZSTD; -#endif + const char * strings[] = {nullptr, "lz4", "lz4hc", "zstd", "qpl", "none", "lightweight"}; + auto compression_method_last = CompressionMethod::Lightweight; if (value < CompressionMethod::LZ4 || value > compression_method_last) throw Exception("Unknown compression method", ErrorCodes::UNKNOWN_COMPRESSION_METHOD); +#if !USE_QPL + if (unlikely(value == CompressionMethod::QPL)) + throw Exception("Unknown compression method", ErrorCodes::UNKNOWN_COMPRESSION_METHOD); +#endif return strings[static_cast(value)]; } @@ -757,11 +764,11 @@ struct SettingTaskQueueType public: bool changed = false; - SettingTaskQueueType(TaskQueueType x = TaskQueueType::DEFAULT) + explicit SettingTaskQueueType(TaskQueueType x = TaskQueueType::DEFAULT) : value(x) {} - operator TaskQueueType() const { return value; } + operator TaskQueueType() const { return value; } // NOLINT(google-explicit-constructor) SettingTaskQueueType & operator=(TaskQueueType x) { set(x); @@ -811,11 +818,11 @@ struct SettingString public: bool changed = false; - SettingString(const String & x = String{}) + explicit SettingString(const String & x = String{}) : value(x) {} - operator String() const { return value; } + operator String() const { return value; } // NOLINT(google-explicit-constructor) SettingString & operator=(const String & x) { set(x); diff --git a/dbms/src/Server/tests/gtest_server_config.cpp b/dbms/src/Server/tests/gtest_server_config.cpp index b90d0494b88..f8a3210b1bc 100644 --- a/dbms/src/Server/tests/gtest_server_config.cpp +++ b/dbms/src/Server/tests/gtest_server_config.cpp @@ -287,6 +287,16 @@ dt_compression_level = 1 [profiles.default] dt_compression_method = "LZ4" dt_compression_level = 1 + )", + R"( +[profiles] +[profiles.default] +dt_compression_method = "Lightweight" + )", + R"( +[profiles] +[profiles.default] +dt_compression_method = "lightweight" )"}; auto & global_ctx = TiFlashTestEnv::getGlobalContext(); @@ -346,6 +356,14 @@ dt_compression_level = 1 ASSERT_EQ(global_ctx.getSettingsRef().dt_compression_method, CompressionMethod::LZ4); ASSERT_EQ(global_ctx.getSettingsRef().dt_compression_level, 1); } + if (i == 6) + { + ASSERT_EQ(global_ctx.getSettingsRef().dt_compression_method, CompressionMethod::Lightweight); + } + if (i == 7) + { + ASSERT_EQ(global_ctx.getSettingsRef().dt_compression_method, CompressionMethod::Lightweight); + } } global_ctx.setSettings(origin_settings); } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp index c2995080b05..43fbef790f6 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp @@ -101,7 +101,11 @@ void serializeColumn( CompressionMethod compression_method, Int64 compression_level) { - CompressedWriteBuffer compressed(buf, CompressionSettings(compression_method, compression_level)); + // Do not use lightweight compression in ColumnFile whose write performance is the bottleneck. + auto settings = compression_method == CompressionMethod::Lightweight + ? CompressionSettings(CompressionMethod::LZ4) + : CompressionSettings(compression_method, compression_level); + CompressedWriteBuffer compressed(buf, settings); type->serializeBinaryBulkWithMultipleStreams( column, [&](const IDataType::SubstreamPath &) { return &compressed; }, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index d85185bc729..b7836c21321 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -23,10 +23,10 @@ #include #include -namespace DB -{ -namespace DM + +namespace DB::DM { + namespace detail { static inline DB::ChecksumAlgo getAlgorithmOrNone(DMFile & dmfile) @@ -38,6 +38,7 @@ static inline size_t getFrameSizeOrDefault(DMFile & dmfile) return dmfile.getConfiguration() ? dmfile.getConfiguration()->getChecksumFrameLength() : DBMS_DEFAULT_BUFFER_SIZE; } } // namespace detail + class DMFileWriter { public: @@ -65,12 +66,18 @@ class DMFileWriter /*flags*/ -1, /*mode*/ 0666, max_compress_block_size)) - , compressed_buf(CompressedWriteBuffer<>::build( - *plain_file, - compression_settings, - !dmfile->getConfiguration().has_value())) , minmaxes(do_index ? std::make_shared(*type) : nullptr) { + assert(compression_settings.settings.size() == 1); + auto setting = CompressionSetting::create<>( + compression_settings.settings[0].method, + compression_settings.settings[0].level, + *type); + compressed_buf = CompressedWriteBuffer<>::build( + *plain_file, + CompressionSettings(setting), + !dmfile->getConfiguration()); + if (!dmfile->useMetaV2()) { // will not used in DMFileFormat::V3, could be removed when v3 is default @@ -181,5 +188,4 @@ class DMFileWriter bool is_empty_file = true; }; -} // namespace DM -} // namespace DB +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index d77c365492b..2532a760874 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -174,10 +174,10 @@ class DMFileMetaV2Test : public DB::base::TiFlashStorageTestBasic } protected: - std::unique_ptr dm_context{}; + std::unique_ptr dm_context; /// all these var live as ref in dm_context - std::shared_ptr path_pool{}; - std::shared_ptr storage_pool{}; + std::shared_ptr path_pool; + std::shared_ptr storage_pool; ColumnDefinesPtr table_columns; DeltaMergeStore::Settings settings; @@ -363,7 +363,6 @@ try CATCH // test multiple data into v3, and read it -// check there are only index and mrk and null.data in merged TEST_F(DMFileMetaV2Test, CheckDMFileV3WithMultiData) try { @@ -387,7 +386,6 @@ try ASSERT_EQ(dm_file->getPackProperties().property_size(), 1); } - { // Test read DMFileBlockInputStreamBuilder builder(dbContext()); @@ -415,7 +413,7 @@ try files.insert(itr.name()); } } - ASSERT_EQ(files.size(), 3); // handle data / col data and merged file + ASSERT_EQ(files.size(), 3); // handle, col data and merged file ASSERT(files.find("0.merged") != files.end()); ASSERT(files.find("%2D1.dat") != files.end()); ASSERT(files.find("1.dat") != files.end()); @@ -636,11 +634,13 @@ try const size_t num_rows_write = 128; DMFileBlockOutputStream::BlockProperty block_property1{ + .not_clean_rows = 0, .deleted_rows = 1, .effective_num_rows = 1, .gc_hint_version = 1, }; DMFileBlockOutputStream::BlockProperty block_property2{ + .not_clean_rows = 0, .deleted_rows = 2, .effective_num_rows = 2, .gc_hint_version = 2, @@ -733,16 +733,19 @@ try const size_t num_rows_write = 8192; DMFileBlockOutputStream::BlockProperty block_property1{ + .not_clean_rows = 0, .deleted_rows = 1, .effective_num_rows = 1, .gc_hint_version = 1, }; DMFileBlockOutputStream::BlockProperty block_property2{ + .not_clean_rows = 0, .deleted_rows = 2, .effective_num_rows = 2, .gc_hint_version = 2, }; DMFileBlockOutputStream::BlockProperty block_property3{ + .not_clean_rows = 0, .deleted_rows = 3, .effective_num_rows = 3, .gc_hint_version = 3, @@ -1046,16 +1049,19 @@ try const size_t num_rows_write = 8192; DMFileBlockOutputStream::BlockProperty block_property1{ + .not_clean_rows = 0, .deleted_rows = 1, .effective_num_rows = 1, .gc_hint_version = 1, }; DMFileBlockOutputStream::BlockProperty block_property2{ + .not_clean_rows = 0, .deleted_rows = 2, .effective_num_rows = 2, .gc_hint_version = 2, }; DMFileBlockOutputStream::BlockProperty block_property3{ + .not_clean_rows = 0, .deleted_rows = 3, .effective_num_rows = 3, .gc_hint_version = 3, @@ -1139,16 +1145,19 @@ try const size_t num_rows_write = 1024; DMFileBlockOutputStream::BlockProperty block_property1{ + .not_clean_rows = 0, .deleted_rows = 1, .effective_num_rows = 1, .gc_hint_version = 1, }; DMFileBlockOutputStream::BlockProperty block_property2{ + .not_clean_rows = 0, .deleted_rows = 2, .effective_num_rows = 2, .gc_hint_version = 2, }; DMFileBlockOutputStream::BlockProperty block_property3{ + .not_clean_rows = 0, .deleted_rows = 3, .effective_num_rows = 3, .gc_hint_version = 3, @@ -1981,10 +1990,10 @@ class DMFileClusteredIndexTest private: String path; - std::unique_ptr dm_context{}; + std::unique_ptr dm_context; /// all these var live as ref in dm_context - std::shared_ptr path_pool{}; - std::shared_ptr storage_pool{}; + std::shared_ptr path_pool; + std::shared_ptr storage_pool; ColumnDefinesPtr table_columns; DeltaMergeStore::Settings settings; diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index e707499a3fb..747f92163df 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -8413,10 +8413,7 @@ "id": "filterFieldsByName", "options": { "include": { - "names": [ - "Time", - "mark cache effectiveness" - ] + "names": ["Time", "mark cache effectiveness"] } } } @@ -8574,8 +8571,8 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, - "y": 165 + "x": 12, + "y": 117 }, "hiddenSeries": false, "id": 292, @@ -9337,6 +9334,213 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The count of the compression algorithm used by each data part", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 117 + }, + "hiddenSeries": false, + "id": 293, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": true, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(tiflash_storage_pack_compression_algorithm_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (type)", + "hide": false, + "interval": "", + "legendFormat": "{{type}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Compression Algorithm Count", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:304", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:305", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The compression ratio of different compression algorithm", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 109 + }, + "hiddenSeries": false, + "id": 294, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(tiflash_storage_pack_compression_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=~\"lz4_uncompressed_bytes\"}[1m]))/sum(rate(tiflash_storage_pack_compression_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=~\"lz4_compressed_bytes\"}[1m]))", + "interval": "", + "legendFormat": "lz4", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "sum(rate(tiflash_storage_pack_compression_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=~\"lightweight_uncompressed_bytes\"}[1m]))/sum(rate(tiflash_storage_pack_compression_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=~\"lightweight_compressed_bytes\"}[1m]))", + "hide": false, + "interval": "", + "legendFormat": "lightweight", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Compression Ratio", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "repeat": null, @@ -19681,20 +19885,10 @@ "2h", "1d" ], - "time_options": [ - "5m", - "15m", - "1h", - "6h", - "12h", - "24h", - "2d", - "7d", - "30d" - ] + "time_options": ["5m", "15m", "1h", "6h", "12h", "24h", "2d", "7d", "30d"] }, "timezone": "", "title": "Test-Cluster-TiFlash-Summary", "uid": "SVbh2xUWk", "version": 1 -} \ No newline at end of file +} diff --git a/tests/docker/config/tiflash_dt_lightweight_compression.toml b/tests/docker/config/tiflash_dt_lightweight_compression.toml new file mode 100644 index 00000000000..cdae444012c --- /dev/null +++ b/tests/docker/config/tiflash_dt_lightweight_compression.toml @@ -0,0 +1,50 @@ +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +tmp_path = "/tmp/tiflash/data/tmp" + +path = "/tmp/tiflash/data/db" +capacity = "10737418240" + +mark_cache_size = 5368709120 +minmax_index_cache_size = 5368709120 +tcp_port = 9000 + +[flash] +service_addr = "0.0.0.0:3930" +[flash.proxy] +addr = "0.0.0.0:20170" +advertise-addr = "tiflash0:20170" +data-dir = "/data" +config = "/proxy.toml" +log-file = "/log/proxy.log" +engine-addr = "tiflash0:3930" +status-addr = "0.0.0.0:20181" +advertise-status-addr = "tiflash0:20181" + +[logger] +count = 10 +errorlog = "/tmp/tiflash/log/error.log" +size = "1000M" +log = "/tmp/tiflash/log/server.log" +level = "trace" + +[raft] +pd_addr = "pd0:2379" +ignore_databases = "system,default" + +[profiles] +[profiles.default] +dt_compression_method = "Lightweight" +dt_compression_level = 3 diff --git a/tests/docker/tiflash-dt-lightweight-compression.yaml b/tests/docker/tiflash-dt-lightweight-compression.yaml new file mode 100644 index 00000000000..34690f23905 --- /dev/null +++ b/tests/docker/tiflash-dt-lightweight-compression.yaml @@ -0,0 +1,43 @@ +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '2.3' + +services: + # for tests under fullstack-test directory + # (engine DeltaTree) + tiflash0: + image: hub.pingcap.net/tiflash/tiflash-ci-base + volumes: + - ./config/tiflash_dt_lightweight_compression.toml:/config.toml:ro + - ./data/tiflash:/tmp/tiflash/data + - ./log/tiflash:/tmp/tiflash/log + - ..:/tests + - ../docker/_env.sh:/tests/_env.sh + - ./log/tiflash-cluster-manager:/tmp/tiflash/data/tmp + - ./config/proxy.toml:/proxy.toml:ro + - ./config/cipher-file-256:/cipher-file-256:ro + - ./data/proxy:/data + - ./log/proxy:/log + - ../.build/tiflash:/tiflash + entrypoint: + - /tiflash/tiflash + - server + - --config-file + - /config.toml + restart: on-failure + depends_on: + - "pd0" + - "tikv0" + diff --git a/tests/tidb-ci/lightweight_compression b/tests/tidb-ci/lightweight_compression new file mode 120000 index 00000000000..12c5821f12c --- /dev/null +++ b/tests/tidb-ci/lightweight_compression @@ -0,0 +1 @@ +../fullstack-test/mpp/ \ No newline at end of file diff --git a/tests/tidb-ci/run.sh b/tests/tidb-ci/run.sh index 144232e9971..4c5f3abdab5 100755 --- a/tests/tidb-ci/run.sh +++ b/tests/tidb-ci/run.sh @@ -61,3 +61,12 @@ docker-compose -f cluster.yaml -f tiflash-dt-force-enable-lm.yaml exec -T tiflas docker-compose -f cluster.yaml -f tiflash-dt-force-enable-lm.yaml down clean_data_log + +# TODO: now set dt_compression_method = "lightweight" by default, so we don't need to run this test. +# run lighweight compression tests +# docker-compose -f cluster.yaml -f tiflash-dt-lightweight-compression.yaml up -d +# wait_env +# docker-compose -f cluster.yaml -f tiflash-dt-lightweight-compression.yaml exec -T tiflash0 bash -c 'cd /tests ; ./run-test.sh tidb-ci/lightweight_compression' + +# docker-compose -f cluster.yaml -f tiflash-dt-lightweight-compression.yaml down +# clean_data_log diff --git a/tests/tidb-ci/tiflash-dt-lightweight-compression.yaml b/tests/tidb-ci/tiflash-dt-lightweight-compression.yaml new file mode 120000 index 00000000000..2c14bbd48b9 --- /dev/null +++ b/tests/tidb-ci/tiflash-dt-lightweight-compression.yaml @@ -0,0 +1 @@ +../docker/tiflash-dt-lightweight-compression.yaml \ No newline at end of file