Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression: Support enabling lightweight compression #9312

Merged
merged 12 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .clangd
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
CompileFlags:
Add: -ferror-limit=0
Add:
- -ferror-limit=0
- -Wno-vla-cxx-extension
18 changes: 17 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,23 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
M(tiflash_read_thread_internal_us, \
"Durations of read thread internal components", \
Histogram, \
F(type_block_queue_pop_latency, {{"type", "block_queue_pop_latency"}}, ExpBuckets{1, 2, 20}))
F(type_block_queue_pop_latency, {{"type", "block_queue_pop_latency"}}, ExpBuckets{1, 2, 20})) \
M(tiflash_storage_pack_compression_algorithm_count, \
"The count of the compression algorithm used by each data part", \
Counter, \
F(type_constant, {"type", "constant"}), \
F(type_constant_delta, {"type", "constant_delta"}), \
F(type_runlength, {"type", "runlength"}), \
F(type_for, {"type", "for"}), \
F(type_delta_for, {"type", "delta_for"}), \
F(type_lz4, {"type", "lz4"})) \
M(tiflash_storage_pack_compression_bytes, \
"The uncompression/compression bytes of lz4 and lightweight", \
Counter, \
F(type_lz4_compressed_bytes, {"type", "lz4_compressed_bytes"}), \
F(type_lz4_uncompressed_bytes, {"type", "lz4_uncompressed_bytes"}), \
F(type_lightweight_compressed_bytes, {"type", "lightweight_compressed_bytes"}), \
F(type_lightweight_uncompressed_bytes, {"type", "lightweight_uncompressed_bytes"}))


/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
1 change: 0 additions & 1 deletion dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <DataStreams/SortHelper.h>
#include <DataStreams/copyData.h>
#include <IO/Buffer/WriteBufferFromFile.h>
#include <IO/Compression/CompressedWriteBuffer.h>
#include <common/logger_useful.h>

namespace DB
Expand Down
16 changes: 7 additions & 9 deletions dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <IO/Compression/CompressionCodecDeltaFOR.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <IO/Compression/CompressionSettings.h>
#include <IO/Compression/EncodingUtil.h>
#include <common/likely.h>

Expand Down Expand Up @@ -60,17 +59,16 @@ UInt32 compressData(const char * source, UInt32 source_size, char * dest)
if unlikely (source_size % bytes_size != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size);
const auto count = source_size / bytes_size;
DB::Compression::deltaEncoding<T>(reinterpret_cast<const T *>(source), count, reinterpret_cast<T *>(dest));
if (unlikely(count == 1))
return bytes_size;
// Cast deltas to signed type to better compress negative values.
// For example, if we have a sequence of UInt8 values [3, 2, 1, 0], the deltas will be [3, -1, -1, -1]
// If we compress them as UInt8, we will get [3, 255, 255, 255], which is not optimal.
using TS = typename std::make_signed<T>::type;
auto for_size = DB::CompressionCodecFOR::compressData<TS>(
reinterpret_cast<TS *>(dest + bytes_size),
source_size - bytes_size,
dest + bytes_size);
// view source as signed integers so that delta will be smaller
DB::Compression::deltaEncoding<TS>(reinterpret_cast<const TS *>(source), count, reinterpret_cast<TS *>(dest));
// do ZigZag encoding on deltas to make deltas as positive integers
auto * deltas = reinterpret_cast<T *>(dest + bytes_size);
DB::Compression::zigZagEncoding<T>(deltas, count - 1, deltas);
// compress deltas using FOR
auto for_size = DB::CompressionCodecFOR::compressData<T>(deltas, source_size - bytes_size, dest + bytes_size);
return bytes_size + for_size;
}

Expand Down
9 changes: 5 additions & 4 deletions dbms/src/IO/Compression/CompressionCodecFOR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <Common/Exception.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <IO/Compression/CompressionSettings.h>
#include <IO/Compression/EncodingUtil.h>
#include <common/likely.h>

Expand Down Expand Up @@ -60,9 +59,11 @@ UInt32 CompressionCodecFOR::compressData(const T * source, UInt32 source_size, c
if unlikely (count == 0)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress empty data");
std::vector<T> values(source, source + count);
T frame_of_reference = *std::min_element(values.cbegin(), values.cend());
UInt8 width = DB::Compression::FOREncodingWidth(values, frame_of_reference);
return DB::Compression::FOREncoding<T, std::is_signed_v<T>>(values, frame_of_reference, width, dest);
auto minmax = std::minmax_element(values.cbegin(), values.cend());
T frame_of_reference = *minmax.first;
T max_value = *minmax.second;
UInt8 width = BitpackingPrimitives::minimumBitWidth<T>(max_value - frame_of_reference);
return DB::Compression::FOREncoding(values.data(), values.size(), frame_of_reference, width, dest);
}

UInt32 CompressionCodecFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/IO/Compression/CompressionCodecFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <magic_enum.hpp>
#include <shared_mutex>


#if USE_QPL
#include <IO/Compression/CompressionCodecDeflateQpl.h>
#endif
Expand Down Expand Up @@ -77,7 +78,6 @@ template CompressionCodecPtr CompressionCodecFactory::getStaticCodec<Compression
template CompressionCodecPtr CompressionCodecFactory::getStaticCodec<CompressionCodecRunLength>(
const CompressionSetting & setting);


template <>
CompressionCodecPtr CompressionCodecFactory::getStaticCodec<CompressionCodecLZ4>(const CompressionSetting & setting)
{
Expand Down Expand Up @@ -154,7 +154,6 @@ CompressionCodecPtr CompressionCodecFactory::getStaticCodec<CompressionCodecDefl
}
#endif


template <bool IS_COMPRESS>
CompressionCodecPtr CompressionCodecFactory::create(const CompressionSetting & setting)
{
Expand Down Expand Up @@ -182,7 +181,13 @@ CompressionCodecPtr CompressionCodecFactory::create(const CompressionSetting & s
if (!isInteger(setting.data_type))
{
if (setting.method_byte == CompressionMethodByte::Lightweight)
{
// Use LZ4 codec for non-integral types
// TODO: maybe we can use zstd?
auto method = CompressionMethod::LZ4;
CompressionSetting setting(method, CompressionSetting::getDefaultLevel(method));
return getStaticCodec<CompressionCodecLZ4>(setting);
}
else
return nullptr;
}
Expand All @@ -192,7 +197,7 @@ CompressionCodecPtr CompressionCodecFactory::create(const CompressionSetting & s
switch (setting.method_byte)
{
case CompressionMethodByte::Lightweight:
return std::make_unique<CompressionCodecLightweight>(setting.data_type);
return std::make_unique<CompressionCodecLightweight>(setting.data_type, setting.level);
case CompressionMethodByte::DeltaFOR:
return getStaticCodec<CompressionCodecDeltaFOR>(setting);
case CompressionMethodByte::RunLength:
Expand Down
8 changes: 0 additions & 8 deletions dbms/src/IO/Compression/CompressionCodecLZ4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
} // namespace ErrorCodes

CompressionCodecLZ4::CompressionCodecLZ4(int level_)
Expand Down Expand Up @@ -72,10 +70,4 @@ CompressionCodecLZ4HC::CompressionCodecLZ4HC(int level_)
: CompressionCodecLZ4(level_)
{}


CompressionCodecPtr getCompressionCodecLZ4(int level)
{
return std::make_unique<CompressionCodecLZ4HC>(level);
}

} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/IO/Compression/CompressionCodecLZ4.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@
namespace DB
{

class CompressionCodecFactory;

class CompressionCodecLZ4 : public ICompressionCodec
{
public:
// The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4
static constexpr size_t ESTIMATE_INTEGER_COMPRESSION_RATIO = 3;

explicit CompressionCodecLZ4(int level_);

UInt8 getMethodByte() const override;
Expand All @@ -39,6 +44,7 @@ class CompressionCodecLZ4 : public ICompressionCodec

protected:
const int level;
friend class CompressionCodecFactory;
};


Expand Down
12 changes: 3 additions & 9 deletions dbms/src/IO/Compression/CompressionCodecLightweight.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
namespace DB
{

// TODO: metrics

namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
} // namespace ErrorCodes

CompressionCodecLightweight::CompressionCodecLightweight(CompressionDataType data_type_)
: data_type(data_type_)
CompressionCodecLightweight::CompressionCodecLightweight(CompressionDataType data_type_, int level_)
: ctx(level_)
, data_type(data_type_)
{}

UInt8 CompressionCodecLightweight::getMethodByte() const
Expand All @@ -46,12 +46,6 @@ UInt32 CompressionCodecLightweight::getMaxCompressedDataSize(UInt32 uncompressed
return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size);
}

CompressionCodecLightweight::~CompressionCodecLightweight()
{
if (ctx.isCompression())
LOG_INFO(Logger::get(), "lightweight codec: {}", ctx.toDebugString());
}

UInt32 CompressionCodecLightweight::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
dest[0] = magic_enum::enum_integer(data_type);
Expand Down
77 changes: 35 additions & 42 deletions dbms/src/IO/Compression/CompressionCodecLightweight.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ namespace DB
class CompressionCodecLightweight : public ICompressionCodec
{
public:
explicit CompressionCodecLightweight(CompressionDataType data_type_);
explicit CompressionCodecLightweight(CompressionDataType data_type_, int level_);

UInt8 getMethodByte() const override;

bool isCompression() const override { return true; }
~CompressionCodecLightweight() override = default;

~CompressionCodecLightweight() override;
bool isCompression() const override { return true; }

protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
Expand All @@ -55,86 +55,79 @@ class CompressionCodecLightweight : public ICompressionCodec
enum class IntegerMode : UInt8
{
Invalid = 0,
CONSTANT = 1, // all values are the same
CONSTANT_DELTA = 2, // the difference between two adjacent values is the same
RunLength = 3, // run-length encoding
Constant = 1, // all values are the same
ConstantDelta = 2, // the difference between two adjacent values is the same
RunLength = 3, // the same value appears multiple times
FOR = 4, // Frame of Reference encoding
DELTA_FOR = 5, // delta encoding and then FOR encoding
DeltaFOR = 5, // delta encoding and then FOR encoding
LZ4 = 6, // the above modes are not suitable, use LZ4 instead
};

// Constant or ConstantDelta
template <typename T>
template <std::integral T>
using ConstantState = T;

template <typename T>
using RunLengthState = std::vector<std::pair<T, UInt8>>;

template <typename T>
template <std::integral T>
struct FORState
{
std::vector<T> values;
T min_value;
UInt8 bit_width;
};

template <typename T>
template <std::integral T>
struct DeltaFORState
{
using TS = typename std::make_signed_t<T>;
std::vector<TS> deltas;
TS min_delta_value;
std::vector<T> deltas;
T min_delta_value;
UInt8 bit_width;
};

// State is a union of different states for different modes
template <typename T>
using IntegerState = std::variant<ConstantState<T>, RunLengthState<T>, FORState<T>, DeltaFORState<T>>;
template <std::integral T>
using IntegerState = std::variant<ConstantState<T>, FORState<T>, DeltaFORState<T>>;

class IntegerCompressContext
{
public:
IntegerCompressContext() = default;
explicit IntegerCompressContext(int round_count_)
: round_count(round_count_)
{}

template <typename T>
template <std::integral T>
void analyze(std::span<const T> & values, IntegerState<T> & state);

void update(size_t uncompressed_size, size_t compressed_size);

String toDebugString() const;
bool isCompression() const { return lz4_counter > 0 || lw_counter > 0; }

IntegerMode mode = IntegerMode::LZ4;

private:
bool needAnalyze() const;

template <std::integral T>
bool needAnalyzeDelta() const;

template <std::integral T>
static constexpr bool needAnalyzeFOR();

bool needAnalyzeRunLength() const;

void resetIfNeed();

private:
// The threshold for the number of blocks to decide whether need to analyze.
// For example:
// If lz4 is used more than COUNT_THRESHOLD times and the compression ratio is better than lightweight codec, do not analyze anymore.
static constexpr size_t COUNT_THRESHOLD = 5;
// Assume that the compression ratio of LZ4 is 3.0
// The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4
static constexpr size_t ESRTIMATE_LZ4_COMPRESSION_RATIO = 3;

size_t lw_uncompressed_size = 0;
size_t lw_compressed_size = 0;
size_t lw_counter = 0;
size_t lz4_uncompressed_size = 0;
size_t lz4_compressed_size = 0;
size_t lz4_counter = 0;
size_t constant_delta_counter = 0;
size_t delta_for_counter = 0;
size_t rle_counter = 0;
// Every round_count blocks as a round, decide whether to analyze the mode.
const int round_count;
int compress_count = 0;
bool used_lz4 = false;
bool used_constant_delta = false;
bool used_delta_for = false;
bool used_rle = false;
};

template <typename T>
template <std::integral T>
size_t compressDataForInteger(const char * source, UInt32 source_size, char * dest) const;

template <typename T>
template <std::integral T>
void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const;

/// Non-integer data
Expand Down
Loading