Skip to content

Commit

Permalink
Compression: Support enabling lightweight compression (#9312)
Browse files Browse the repository at this point in the history
close #8982, close #9336

Compression: Support enabling lightweight compression

Signed-off-by: Lloyd-Pottiger <[email protected]>

Co-authored-by: jinhelin <[email protected]>
  • Loading branch information
Lloyd-Pottiger and JinheLin authored Sep 6, 2024
1 parent 0a9c4bd commit 627fd1f
Show file tree
Hide file tree
Showing 30 changed files with 1,051 additions and 428 deletions.
4 changes: 3 additions & 1 deletion .clangd
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
CompileFlags:
Add: -ferror-limit=0
Add:
- -ferror-limit=0
- -Wno-vla-cxx-extension
18 changes: 17 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,23 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"Durations of read thread internal components", \
Histogram, \
F(type_block_queue_pop_latency, {{"type", "block_queue_pop_latency"}}, ExpBuckets{1, 2, 20}), \
F(type_schedule_one_round, {{"type", "schedule_one_round"}}, ExpBuckets{1, 2, 20}))
F(type_schedule_one_round, {{"type", "schedule_one_round"}}, ExpBuckets{1, 2, 20})) \
M(tiflash_storage_pack_compression_algorithm_count, \
"The count of the compression algorithm used by each data part", \
Counter, \
F(type_constant, {"type", "constant"}), \
F(type_constant_delta, {"type", "constant_delta"}), \
F(type_runlength, {"type", "runlength"}), \
F(type_for, {"type", "for"}), \
F(type_delta_for, {"type", "delta_for"}), \
F(type_lz4, {"type", "lz4"})) \
M(tiflash_storage_pack_compression_bytes, \
"The uncompression/compression bytes of lz4 and lightweight", \
Counter, \
F(type_lz4_compressed_bytes, {"type", "lz4_compressed_bytes"}), \
F(type_lz4_uncompressed_bytes, {"type", "lz4_uncompressed_bytes"}), \
F(type_lightweight_compressed_bytes, {"type", "lightweight_compressed_bytes"}), \
F(type_lightweight_uncompressed_bytes, {"type", "lightweight_uncompressed_bytes"}))


/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
1 change: 0 additions & 1 deletion dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <DataStreams/SortHelper.h>
#include <DataStreams/copyData.h>
#include <IO/Buffer/WriteBufferFromFile.h>
#include <IO/Compression/CompressedWriteBuffer.h>
#include <common/logger_useful.h>

namespace DB
Expand Down
16 changes: 7 additions & 9 deletions dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <IO/Compression/CompressionCodecDeltaFOR.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <IO/Compression/CompressionSettings.h>
#include <IO/Compression/EncodingUtil.h>
#include <common/likely.h>

Expand Down Expand Up @@ -60,17 +59,16 @@ UInt32 compressData(const char * source, UInt32 source_size, char * dest)
if unlikely (source_size % bytes_size != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size);
const auto count = source_size / bytes_size;
DB::Compression::deltaEncoding<T>(reinterpret_cast<const T *>(source), count, reinterpret_cast<T *>(dest));
if (unlikely(count == 1))
return bytes_size;
// Cast deltas to signed type to better compress negative values.
// For example, if we have a sequence of UInt8 values [3, 2, 1, 0], the deltas will be [3, -1, -1, -1]
// If we compress them as UInt8, we will get [3, 255, 255, 255], which is not optimal.
using TS = typename std::make_signed<T>::type;
auto for_size = DB::CompressionCodecFOR::compressData<TS>(
reinterpret_cast<TS *>(dest + bytes_size),
source_size - bytes_size,
dest + bytes_size);
// view source as signed integers so that delta will be smaller
DB::Compression::deltaEncoding<TS>(reinterpret_cast<const TS *>(source), count, reinterpret_cast<TS *>(dest));
// do ZigZag encoding on deltas to make deltas as positive integers
auto * deltas = reinterpret_cast<T *>(dest + bytes_size);
DB::Compression::zigZagEncoding<T>(deltas, count - 1, deltas);
// compress deltas using FOR
auto for_size = DB::CompressionCodecFOR::compressData<T>(deltas, source_size - bytes_size, dest + bytes_size);
return bytes_size + for_size;
}

Expand Down
9 changes: 5 additions & 4 deletions dbms/src/IO/Compression/CompressionCodecFOR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <Common/Exception.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <IO/Compression/CompressionSettings.h>
#include <IO/Compression/EncodingUtil.h>
#include <common/likely.h>

Expand Down Expand Up @@ -60,9 +59,11 @@ UInt32 CompressionCodecFOR::compressData(const T * source, UInt32 source_size, c
if unlikely (count == 0)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress empty data");
std::vector<T> values(source, source + count);
T frame_of_reference = *std::min_element(values.cbegin(), values.cend());
UInt8 width = DB::Compression::FOREncodingWidth(values, frame_of_reference);
return DB::Compression::FOREncoding<T, std::is_signed_v<T>>(values, frame_of_reference, width, dest);
auto minmax = std::minmax_element(values.cbegin(), values.cend());
T frame_of_reference = *minmax.first;
T max_value = *minmax.second;
UInt8 width = BitpackingPrimitives::minimumBitWidth<T>(max_value - frame_of_reference);
return DB::Compression::FOREncoding(values.data(), values.size(), frame_of_reference, width, dest);
}

UInt32 CompressionCodecFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/IO/Compression/CompressionCodecFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <magic_enum.hpp>
#include <shared_mutex>


#if USE_QPL
#include <IO/Compression/CompressionCodecDeflateQpl.h>
#endif
Expand Down Expand Up @@ -77,7 +78,6 @@ template CompressionCodecPtr CompressionCodecFactory::getStaticCodec<Compression
template CompressionCodecPtr CompressionCodecFactory::getStaticCodec<CompressionCodecRunLength>(
const CompressionSetting & setting);


template <>
CompressionCodecPtr CompressionCodecFactory::getStaticCodec<CompressionCodecLZ4>(const CompressionSetting & setting)
{
Expand Down Expand Up @@ -154,7 +154,6 @@ CompressionCodecPtr CompressionCodecFactory::getStaticCodec<CompressionCodecDefl
}
#endif


template <bool IS_COMPRESS>
CompressionCodecPtr CompressionCodecFactory::create(const CompressionSetting & setting)
{
Expand Down Expand Up @@ -182,7 +181,13 @@ CompressionCodecPtr CompressionCodecFactory::create(const CompressionSetting & s
if (!isInteger(setting.data_type))
{
if (setting.method_byte == CompressionMethodByte::Lightweight)
{
// Use LZ4 codec for non-integral types
// TODO: maybe we can use zstd?
auto method = CompressionMethod::LZ4;
CompressionSetting setting(method, CompressionSetting::getDefaultLevel(method));
return getStaticCodec<CompressionCodecLZ4>(setting);
}
else
return nullptr;
}
Expand All @@ -192,7 +197,7 @@ CompressionCodecPtr CompressionCodecFactory::create(const CompressionSetting & s
switch (setting.method_byte)
{
case CompressionMethodByte::Lightweight:
return std::make_unique<CompressionCodecLightweight>(setting.data_type);
return std::make_unique<CompressionCodecLightweight>(setting.data_type, setting.level);
case CompressionMethodByte::DeltaFOR:
return getStaticCodec<CompressionCodecDeltaFOR>(setting);
case CompressionMethodByte::RunLength:
Expand Down
8 changes: 0 additions & 8 deletions dbms/src/IO/Compression/CompressionCodecLZ4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
} // namespace ErrorCodes

CompressionCodecLZ4::CompressionCodecLZ4(int level_)
Expand Down Expand Up @@ -72,10 +70,4 @@ CompressionCodecLZ4HC::CompressionCodecLZ4HC(int level_)
: CompressionCodecLZ4(level_)
{}


CompressionCodecPtr getCompressionCodecLZ4(int level)
{
return std::make_unique<CompressionCodecLZ4HC>(level);
}

} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/IO/Compression/CompressionCodecLZ4.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@
namespace DB
{

class CompressionCodecFactory;

class CompressionCodecLZ4 : public ICompressionCodec
{
public:
// The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4
static constexpr size_t ESTIMATE_INTEGER_COMPRESSION_RATIO = 3;

explicit CompressionCodecLZ4(int level_);

UInt8 getMethodByte() const override;
Expand All @@ -39,6 +44,7 @@ class CompressionCodecLZ4 : public ICompressionCodec

protected:
const int level;
friend class CompressionCodecFactory;
};


Expand Down
12 changes: 3 additions & 9 deletions dbms/src/IO/Compression/CompressionCodecLightweight.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
namespace DB
{

// TODO: metrics

namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
} // namespace ErrorCodes

CompressionCodecLightweight::CompressionCodecLightweight(CompressionDataType data_type_)
: data_type(data_type_)
CompressionCodecLightweight::CompressionCodecLightweight(CompressionDataType data_type_, int level_)
: ctx(level_)
, data_type(data_type_)
{}

UInt8 CompressionCodecLightweight::getMethodByte() const
Expand All @@ -46,12 +46,6 @@ UInt32 CompressionCodecLightweight::getMaxCompressedDataSize(UInt32 uncompressed
return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size);
}

CompressionCodecLightweight::~CompressionCodecLightweight()
{
if (ctx.isCompression())
LOG_INFO(Logger::get(), "lightweight codec: {}", ctx.toDebugString());
}

UInt32 CompressionCodecLightweight::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
dest[0] = magic_enum::enum_integer(data_type);
Expand Down
77 changes: 35 additions & 42 deletions dbms/src/IO/Compression/CompressionCodecLightweight.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ namespace DB
class CompressionCodecLightweight : public ICompressionCodec
{
public:
explicit CompressionCodecLightweight(CompressionDataType data_type_);
explicit CompressionCodecLightweight(CompressionDataType data_type_, int level_);

UInt8 getMethodByte() const override;

bool isCompression() const override { return true; }
~CompressionCodecLightweight() override = default;

~CompressionCodecLightweight() override;
bool isCompression() const override { return true; }

protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
Expand All @@ -55,86 +55,79 @@ class CompressionCodecLightweight : public ICompressionCodec
enum class IntegerMode : UInt8
{
Invalid = 0,
CONSTANT = 1, // all values are the same
CONSTANT_DELTA = 2, // the difference between two adjacent values is the same
RunLength = 3, // run-length encoding
Constant = 1, // all values are the same
ConstantDelta = 2, // the difference between two adjacent values is the same
RunLength = 3, // the same value appears multiple times
FOR = 4, // Frame of Reference encoding
DELTA_FOR = 5, // delta encoding and then FOR encoding
DeltaFOR = 5, // delta encoding and then FOR encoding
LZ4 = 6, // the above modes are not suitable, use LZ4 instead
};

// Constant or ConstantDelta
template <typename T>
template <std::integral T>
using ConstantState = T;

template <typename T>
using RunLengthState = std::vector<std::pair<T, UInt8>>;

template <typename T>
template <std::integral T>
struct FORState
{
std::vector<T> values;
T min_value;
UInt8 bit_width;
};

template <typename T>
template <std::integral T>
struct DeltaFORState
{
using TS = typename std::make_signed_t<T>;
std::vector<TS> deltas;
TS min_delta_value;
std::vector<T> deltas;
T min_delta_value;
UInt8 bit_width;
};

// State is a union of different states for different modes
template <typename T>
using IntegerState = std::variant<ConstantState<T>, RunLengthState<T>, FORState<T>, DeltaFORState<T>>;
template <std::integral T>
using IntegerState = std::variant<ConstantState<T>, FORState<T>, DeltaFORState<T>>;

class IntegerCompressContext
{
public:
IntegerCompressContext() = default;
explicit IntegerCompressContext(int round_count_)
: round_count(round_count_)
{}

template <typename T>
template <std::integral T>
void analyze(std::span<const T> & values, IntegerState<T> & state);

void update(size_t uncompressed_size, size_t compressed_size);

String toDebugString() const;
bool isCompression() const { return lz4_counter > 0 || lw_counter > 0; }

IntegerMode mode = IntegerMode::LZ4;

private:
bool needAnalyze() const;

template <std::integral T>
bool needAnalyzeDelta() const;

template <std::integral T>
static constexpr bool needAnalyzeFOR();

bool needAnalyzeRunLength() const;

void resetIfNeed();

private:
// The threshold for the number of blocks to decide whether need to analyze.
// For example:
// If lz4 is used more than COUNT_THRESHOLD times and the compression ratio is better than lightweight codec, do not analyze anymore.
static constexpr size_t COUNT_THRESHOLD = 5;
// Assume that the compression ratio of LZ4 is 3.0
// The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4
static constexpr size_t ESRTIMATE_LZ4_COMPRESSION_RATIO = 3;

size_t lw_uncompressed_size = 0;
size_t lw_compressed_size = 0;
size_t lw_counter = 0;
size_t lz4_uncompressed_size = 0;
size_t lz4_compressed_size = 0;
size_t lz4_counter = 0;
size_t constant_delta_counter = 0;
size_t delta_for_counter = 0;
size_t rle_counter = 0;
// Every round_count blocks as a round, decide whether to analyze the mode.
const int round_count;
int compress_count = 0;
bool used_lz4 = false;
bool used_constant_delta = false;
bool used_delta_for = false;
bool used_rle = false;
};

template <typename T>
template <std::integral T>
size_t compressDataForInteger(const char * source, UInt32 source_size, char * dest) const;

template <typename T>
template <std::integral T>
void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const;

/// Non-integer data
Expand Down
Loading

0 comments on commit 627fd1f

Please sign in to comment.