Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom compression codecs #3899

Merged
merged 51 commits into from
Dec 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
6d0c4ea
ISSUES-838 add lz4、none、zstd codec
zhang2014 Oct 11, 2018
5c30bd2
Compilable code
alesapin Dec 13, 2018
a492d0f
Seems like working parser
alesapin Dec 13, 2018
d7bc4ae
Seems like first simpliest working code
alesapin Dec 13, 2018
5c823d5
Remove debug
alesapin Dec 13, 2018
96da319
Rewrite strange logic to another strange logic
alesapin Dec 13, 2018
29e2bba
Merge remote-tracking branch 'origin' into custom_compression_codecs
alesapin Dec 14, 2018
3370a0a
Fix order of calls
alesapin Dec 14, 2018
cc229ac
Remove strange test
alesapin Dec 14, 2018
8a95eb9
More tests for parser
alesapin Dec 14, 2018
0f29bff
Style
alesapin Dec 14, 2018
436845f
Debuging codec multiple
alesapin Dec 17, 2018
36a6121
Merge branch 'master' into custom_compression_codecs
alesapin Dec 17, 2018
6fe01af
Merge branch 'master' into custom_compression_codecs
alesapin Dec 18, 2018
fd49cc3
Simpliest codec multiple works
alesapin Dec 18, 2018
bb4eb06
Fix strange logic
alesapin Dec 18, 2018
60456eb
More tests on compression codecs
alesapin Dec 18, 2018
3d0da64
one more test
alesapin Dec 18, 2018
314fbbd
Remove some debug output
alesapin Dec 18, 2018
bc0e0c2
Add level test
alesapin Dec 18, 2018
9b0d47c
Add zstd params validation
alesapin Dec 18, 2018
512fe3c
Refactoring (sanitizer failure)
alesapin Dec 19, 2018
04902c5
Non effective, but working code
alesapin Dec 20, 2018
0ae14be
Fix codes
alesapin Dec 20, 2018
aee6c0a
Remove dump code
alesapin Dec 20, 2018
5b23210
Merge branch 'master' into custom_compression_codecs
alesapin Dec 21, 2018
7c11455
Remove compression settings
alesapin Dec 21, 2018
6f73338
Move header with compression info to another place
alesapin Dec 21, 2018
42b9735
Add LZ4HC
alesapin Dec 21, 2018
74ea831
Remove commented codec
alesapin Dec 21, 2018
a0240d8
Remove accident change
alesapin Dec 21, 2018
1e771a4
Fix error codes
alesapin Dec 24, 2018
29ad7f9
Remove redundant changes and shift error codes
alesapin Dec 24, 2018
f2fd1e4
Fix style and includes
alesapin Dec 24, 2018
36fb2d1
Add test with non default config
alesapin Dec 24, 2018
f991232
Add custom network compression settings to test
alesapin Dec 24, 2018
22b9969
Merge branch 'master' into custom_compression_codecs
alesapin Dec 24, 2018
f7e06cf
Remove random changes
alesapin Dec 24, 2018
bbdfc77
Update CachedCompressedReadBuffer.h
alesapin Dec 24, 2018
a82dfef
Fix in parser
alesapin Dec 24, 2018
4309775
Add coments and fix compression for nested types
alesapin Dec 25, 2018
c91687f
Small refactoring
alesapin Dec 25, 2018
1228645
Fix segfault and add tabs to ton of tests (desc table)
alesapin Dec 25, 2018
6e8ed0c
Merge branch 'master' into custom_compression_codecs
alesapin Dec 25, 2018
8abcdc7
More tabs in desc table tests
alesapin Dec 25, 2018
8f82d5a
Merge branch 'master' into custom_compression_codecs
alesapin Dec 26, 2018
89966db
Add multiple codecs to compressor
alesapin Dec 26, 2018
31f8294
Update Compressor.cpp
alesapin Dec 26, 2018
9e32de0
Update Compressor.cpp
alesapin Dec 26, 2018
3afa03d
Merge branch 'master' into custom_compression_codecs
alesapin Dec 27, 2018
3ed1e54
Merge branch 'custom_compression_codecs' of github.com:yandex/ClickHo…
alesapin Dec 27, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ set(dbms_sources)
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)

add_headers_and_sources(clickhouse_common_io src/Common)
add_headers_and_sources(clickhouse_common_io src/Compression)
add_headers_and_sources(clickhouse_common_io src/Common/HashTable)
add_headers_and_sources(clickhouse_common_io src/IO)

Expand Down Expand Up @@ -150,6 +151,7 @@ target_link_libraries (clickhouse_common_io
PUBLIC
common
PRIVATE
clickhouse_parsers
string_utils
widechar_width
${LINK_LIBRARIES_ONLY_ON_X86_64}
Expand Down
49 changes: 39 additions & 10 deletions dbms/programs/compressor/Compressor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include <iostream>

#include <optional>
#include <boost/program_options.hpp>

#include <Common/Exception.h>
Expand All @@ -10,12 +10,14 @@
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>

#include <Compression/CompressionFactory.h>

namespace DB
{
namespace ErrorCodes
{
extern const int TOO_LARGE_SIZE_COMPRESSED;
extern const int BAD_ARGUMENTS;
}
}

Expand Down Expand Up @@ -61,7 +63,8 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("level", boost::program_options::value<int>(), "compression level")
("codec", boost::program_options::value<std::vector<std::string>>()->multitoken(), "use codecs combination instead of LZ4")
("level", boost::program_options::value<std::vector<int>>()->multitoken(), "compression levels for codecs specified via --codec")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
;
Expand All @@ -84,19 +87,45 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
unsigned block_size = options["block-size"].as<unsigned>();
std::vector<std::string> codecs;
if (options.count("codec"))
codecs = options["codec"].as<std::vector<std::string>>();

if ((use_lz4hc || use_zstd || use_none) && !codecs.empty())
throw DB::Exception("Wrong options, codec flags like --zstd and --codec options are mutually exclusive", DB::ErrorCodes::BAD_ARGUMENTS);

DB::CompressionMethod method = DB::CompressionMethod::LZ4;
std::string method_family = "LZ4";

if (use_lz4hc)
method = DB::CompressionMethod::LZ4HC;
method_family = "LZ4HC";
else if (use_zstd)
method = DB::CompressionMethod::ZSTD;
method_family = "ZSTD";
else if (use_none)
method = DB::CompressionMethod::NONE;
method_family = "NONE";

std::vector<int> levels;
if (options.count("level"))
levels = options["level"].as<std::vector<int>>();

DB::CompressionCodecPtr codec;
if (!codecs.empty())
{
if (levels.size() > codecs.size())
throw DB::Exception("Specified more levels than codecs", DB::ErrorCodes::BAD_ARGUMENTS);

std::vector<DB::CodecNameWithLevel> codec_names;
for (size_t i = 0; i < codecs.size(); ++i)
{
if (i < levels.size())
codec_names.emplace_back(codecs[i], levels[i]);
else
codec_names.emplace_back(codecs[i], std::nullopt);
}
codec = DB::CompressionCodecFactory::instance().get(codec_names);
}
else
codec = DB::CompressionCodecFactory::instance().get(method_family, levels.empty() ? std::nullopt : std::optional<int>(levels.back()));

DB::CompressionSettings settings(method, options.count("level")
? options["level"].as<int>()
: DB::CompressionSettings::getDefaultLevel(method));

DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);
DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO);
Expand All @@ -115,7 +144,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
else
{
/// Compression
DB::CompressedWriteBuffer to(wb, settings, block_size);
DB::CompressedWriteBuffer to(wb, codec, block_size);
DB::copyData(rb, to);
}
}
Expand Down
27 changes: 27 additions & 0 deletions dbms/programs/compressor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
## ClickHouse compressor

Simple program for data compression and decompression.

### Examples

Compress data with LZ4:
```
$ ./clickhouse-compressor < input_file > output_file
```

Decompress data from LZ4 format:
```
$ ./clickhouse-compressor --decompress < input_file > output_file
```

Compress data with ZSTD at level 5:

```
$ ./clickhouse-compressor --codec ZSTD --level 5 < input_file > output_file
```

Compress data with ZSTD level 10, LZ4HC level 7 and LZ4.

```
$ ./clickhouse-compressor --codec ZSTD --level 5 --codec LZ4HC --level 7 --codec LZ4 < input_file > output_file
```
11 changes: 8 additions & 3 deletions dbms/programs/server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/CompressionSettings.h>
#include <IO/copyData.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
Expand All @@ -32,6 +31,7 @@
#include <Core/ExternalTable.h>
#include <Storages/ColumnDefault.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Compression/CompressionFactory.h>

#include "TCPHandler.h"

Expand Down Expand Up @@ -728,7 +728,7 @@ bool TCPHandler::receiveData()
{
NamesAndTypesList columns = block.getNamesAndTypesList();
storage = StorageMemory::create(external_table_name,
ColumnsDescription{columns, NamesAndTypesList{}, NamesAndTypesList{}, ColumnDefaults{}, ColumnComments{}});
ColumnsDescription{columns, NamesAndTypesList{}, NamesAndTypesList{}, ColumnDefaults{}, ColumnComments{}, ColumnCodecs{}});
storage->startup();
query_context.addExternalTable(external_table_name, storage);
}
Expand Down Expand Up @@ -772,9 +772,14 @@ void TCPHandler::initBlockOutput(const Block & block)
{
if (!state.maybe_compressed_out)
{
std::string method = query_context.getSettingsRef().network_compression_method;
std::optional<int> level;
if (method == "ZSTD")
level = query_context.getSettingsRef().network_zstd_compression_level;

if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionSettings(query_context.getSettingsRef()));
*out, CompressionCodecFactory::instance().get(method, level));
else
state.maybe_compressed_out = out;
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ add_subdirectory (AggregateFunctions)
add_subdirectory (Client)
add_subdirectory (TableFunctions)
add_subdirectory (Formats)
add_subdirectory (Compression)
17 changes: 15 additions & 2 deletions dbms/src/Client/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/config_version.h>
#include <Interpreters/ClientInfo.h>
#include <Compression/CompressionFactory.h>

#include <Common/config.h>
#if USE_POCO_NETSSL
Expand Down Expand Up @@ -353,7 +354,19 @@ void Connection::sendQuery(
if (!connected)
connect();

compression_settings = settings ? CompressionSettings(*settings) : CompressionSettings(CompressionMethod::LZ4);
if (settings)
{
std::optional<int> level;
std::string method = settings->network_compression_method;

/// Bad custom logic
if (method == "ZSTD")
level = settings->network_zstd_compression_level;

compression_codec = CompressionCodecFactory::instance().get(method, level);
}
else
compression_codec = CompressionCodecFactory::instance().getDefaultCodec();

query_id = query_id_;

Expand Down Expand Up @@ -426,7 +439,7 @@ void Connection::sendData(const Block & block, const String & name)
if (!block_out)
{
if (compression == Protocol::Compression::Enable)
maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, compression_settings);
maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, compression_codec);
else
maybe_compressed_out = out;

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Client/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/BlockStreamProfileInfo.h>

#include <IO/CompressionSettings.h>
#include <IO/ConnectionTimeouts.h>

#include <Interpreters/Settings.h>
#include <Interpreters/TablesStatus.h>

#include <Compression/ICompressionCodec.h>

#include <atomic>
#include <optional>

Expand Down Expand Up @@ -205,7 +206,7 @@ class Connection : private boost::noncopyable
Protocol::Secure secure; /// Enable data encryption for communication.

/// What compression settings to use while sending data for INSERT queries and external tables.
CompressionSettings compression_settings;
CompressionCodecPtr compression_codec;

/** If not nullptr, used to limit network traffic.
* Only traffic for transferring blocks is accounted. Other packets don't.
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ namespace ErrorCodes
extern const int UNKNOWN_LOG_LEVEL = 428;
extern const int FAILED_TO_GETPWUID = 429;
extern const int MISMATCHING_USERS_FOR_PROCESS_AND_DATA = 430;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE = 431;
extern const int UNKNOWN_CODEC = 432;
extern const int ILLEGAL_CODEC_PARAMETER = 433;

extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
Expand Down
Empty file.
99 changes: 99 additions & 0 deletions dbms/src/Compression/CompressionCodecLZ4.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#include <Compression/CompressionCodecLZ4.h>
#include <lz4.h>
#include <lz4hc.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <IO/LZ4_decompress_faster.h>
#include "CompressionCodecLZ4.h"
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>



namespace DB
{

namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}


UInt8 CompressionCodecLZ4::getMethodByte() const
{
return static_cast<UInt8>(CompressionMethodByte::LZ4);
}

String CompressionCodecLZ4::getCodecDesc() const
{
return "LZ4";
}

UInt32 CompressionCodecLZ4::getCompressedDataSize(UInt32 uncompressed_size) const
{
return LZ4_COMPRESSBOUND(uncompressed_size);
}

UInt32 CompressionCodecLZ4::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
return LZ4_compress_default(source, dest, source_size, LZ4_COMPRESSBOUND(source_size));
}

void CompressionCodecLZ4::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
LZ4::decompress(source, dest, source_size, uncompressed_size, lz4_stat);
}

void registerCodecLZ4(CompressionCodecFactory & factory)
{
factory.registerSimpleCompressionCodec("LZ4", static_cast<UInt8>(CompressionMethodByte::LZ4), [&] ()
{
return std::make_shared<CompressionCodecLZ4>();
});
}


String CompressionCodecLZ4HC::getCodecDesc() const
{
return "LZ4HC";
}

UInt32 CompressionCodecLZ4HC::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
auto success = LZ4_compress_HC(source, dest, source_size, LZ4_COMPRESSBOUND(source_size), level);

if (!success)
throw Exception("Cannot LZ4_compress_HC", ErrorCodes::CANNOT_COMPRESS);

return success;
}

void registerCodecLZ4HC(CompressionCodecFactory & factory)
{
factory.registerCompressionCodec("LZ4HC", {}, [&](const ASTPtr & arguments) -> CompressionCodecPtr
{
int level = 0;

if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception("LZ4HC codec must have 1 parameter, given " + std::to_string(arguments->children.size()), ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);

const auto children = arguments->children;
const ASTLiteral * literal = static_cast<const ASTLiteral *>(children[0].get());
level = literal->value.safeGet<UInt64>();
}

return std::make_shared<CompressionCodecLZ4HC>(level);
});
}

CompressionCodecLZ4HC::CompressionCodecLZ4HC(int level_)
: level(level_)
{
}

}

Loading